Home | History | Annotate | Download | only in Lib
      1 """Thread module emulating a subset of Java's threading model."""
      2 
      3 import os as _os
      4 import sys as _sys
      5 import _thread
      6 
      7 from time import monotonic as _time
      8 from traceback import format_exc as _format_exc
      9 from _weakrefset import WeakSet
     10 from itertools import islice as _islice, count as _count
     11 try:
     12     from _collections import deque as _deque
     13 except ImportError:
     14     from collections import deque as _deque
     15 
     16 # Note regarding PEP 8 compliant names
     17 #  This threading model was originally inspired by Java, and inherited
     18 # the convention of camelCase function and method names from that
     19 # language. Those original names are not in any imminent danger of
     20 # being deprecated (even for Py3k),so this module provides them as an
     21 # alias for the PEP 8 compliant names
     22 # Note that using the new PEP 8 compliant names facilitates substitution
     23 # with the multiprocessing module, which doesn't provide the old
     24 # Java inspired names.
     25 
     26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
     27            'enumerate', 'main_thread', 'TIMEOUT_MAX',
     28            'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
     29            'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
     30            'setprofile', 'settrace', 'local', 'stack_size']
     31 
     32 # Rename some stuff so "from threading import *" is safe
     33 _start_new_thread = _thread.start_new_thread
     34 _allocate_lock = _thread.allocate_lock
     35 _set_sentinel = _thread._set_sentinel
     36 get_ident = _thread.get_ident
     37 ThreadError = _thread.error
     38 try:
     39     _CRLock = _thread.RLock
     40 except AttributeError:
     41     _CRLock = None
     42 TIMEOUT_MAX = _thread.TIMEOUT_MAX
     43 del _thread
     44 
     45 
     46 # Support for profile and trace hooks
     47 
     48 _profile_hook = None
     49 _trace_hook = None
     50 
     51 def setprofile(func):
     52     """Set a profile function for all threads started from the threading module.
     53 
     54     The func will be passed to sys.setprofile() for each thread, before its
     55     run() method is called.
     56 
     57     """
     58     global _profile_hook
     59     _profile_hook = func
     60 
     61 def settrace(func):
     62     """Set a trace function for all threads started from the threading module.
     63 
     64     The func will be passed to sys.settrace() for each thread, before its run()
     65     method is called.
     66 
     67     """
     68     global _trace_hook
     69     _trace_hook = func
     70 
     71 # Synchronization classes
     72 
     73 Lock = _allocate_lock
     74 
     75 def RLock(*args, **kwargs):
     76     """Factory function that returns a new reentrant lock.
     77 
     78     A reentrant lock must be released by the thread that acquired it. Once a
     79     thread has acquired a reentrant lock, the same thread may acquire it again
     80     without blocking; the thread must release it once for each time it has
     81     acquired it.
     82 
     83     """
     84     if _CRLock is None:
     85         return _PyRLock(*args, **kwargs)
     86     return _CRLock(*args, **kwargs)
     87 
     88 class _RLock:
     89     """This class implements reentrant lock objects.
     90 
     91     A reentrant lock must be released by the thread that acquired it. Once a
     92     thread has acquired a reentrant lock, the same thread may acquire it
     93     again without blocking; the thread must release it once for each time it
     94     has acquired it.
     95 
     96     """
     97 
     98     def __init__(self):
     99         self._block = _allocate_lock()
    100         self._owner = None
    101         self._count = 0
    102 
    103     def __repr__(self):
    104         owner = self._owner
    105         try:
    106             owner = _active[owner].name
    107         except KeyError:
    108             pass
    109         return "<%s %s.%s object owner=%r count=%d at %s>" % (
    110             "locked" if self._block.locked() else "unlocked",
    111             self.__class__.__module__,
    112             self.__class__.__qualname__,
    113             owner,
    114             self._count,
    115             hex(id(self))
    116         )
    117 
    118     def acquire(self, blocking=True, timeout=-1):
    119         """Acquire a lock, blocking or non-blocking.
    120 
    121         When invoked without arguments: if this thread already owns the lock,
    122         increment the recursion level by one, and return immediately. Otherwise,
    123         if another thread owns the lock, block until the lock is unlocked. Once
    124         the lock is unlocked (not owned by any thread), then grab ownership, set
    125         the recursion level to one, and return. If more than one thread is
    126         blocked waiting until the lock is unlocked, only one at a time will be
    127         able to grab ownership of the lock. There is no return value in this
    128         case.
    129 
    130         When invoked with the blocking argument set to true, do the same thing
    131         as when called without arguments, and return true.
    132 
    133         When invoked with the blocking argument set to false, do not block. If a
    134         call without an argument would block, return false immediately;
    135         otherwise, do the same thing as when called without arguments, and
    136         return true.
    137 
    138         When invoked with the floating-point timeout argument set to a positive
    139         value, block for at most the number of seconds specified by timeout
    140         and as long as the lock cannot be acquired.  Return true if the lock has
    141         been acquired, false if the timeout has elapsed.
    142 
    143         """
    144         me = get_ident()
    145         if self._owner == me:
    146             self._count += 1
    147             return 1
    148         rc = self._block.acquire(blocking, timeout)
    149         if rc:
    150             self._owner = me
    151             self._count = 1
    152         return rc
    153 
    154     __enter__ = acquire
    155 
    156     def release(self):
    157         """Release a lock, decrementing the recursion level.
    158 
    159         If after the decrement it is zero, reset the lock to unlocked (not owned
    160         by any thread), and if any other threads are blocked waiting for the
    161         lock to become unlocked, allow exactly one of them to proceed. If after
    162         the decrement the recursion level is still nonzero, the lock remains
    163         locked and owned by the calling thread.
    164 
    165         Only call this method when the calling thread owns the lock. A
    166         RuntimeError is raised if this method is called when the lock is
    167         unlocked.
    168 
    169         There is no return value.
    170 
    171         """
    172         if self._owner != get_ident():
    173             raise RuntimeError("cannot release un-acquired lock")
    174         self._count = count = self._count - 1
    175         if not count:
    176             self._owner = None
    177             self._block.release()
    178 
    179     def __exit__(self, t, v, tb):
    180         self.release()
    181 
    182     # Internal methods used by condition variables
    183 
    184     def _acquire_restore(self, state):
    185         self._block.acquire()
    186         self._count, self._owner = state
    187 
    188     def _release_save(self):
    189         if self._count == 0:
    190             raise RuntimeError("cannot release un-acquired lock")
    191         count = self._count
    192         self._count = 0
    193         owner = self._owner
    194         self._owner = None
    195         self._block.release()
    196         return (count, owner)
    197 
    198     def _is_owned(self):
    199         return self._owner == get_ident()
    200 
    201 _PyRLock = _RLock
    202 
    203 
    204 class Condition:
    205     """Class that implements a condition variable.
    206 
    207     A condition variable allows one or more threads to wait until they are
    208     notified by another thread.
    209 
    210     If the lock argument is given and not None, it must be a Lock or RLock
    211     object, and it is used as the underlying lock. Otherwise, a new RLock object
    212     is created and used as the underlying lock.
    213 
    214     """
    215 
    216     def __init__(self, lock=None):
    217         if lock is None:
    218             lock = RLock()
    219         self._lock = lock
    220         # Export the lock's acquire() and release() methods
    221         self.acquire = lock.acquire
    222         self.release = lock.release
    223         # If the lock defines _release_save() and/or _acquire_restore(),
    224         # these override the default implementations (which just call
    225         # release() and acquire() on the lock).  Ditto for _is_owned().
    226         try:
    227             self._release_save = lock._release_save
    228         except AttributeError:
    229             pass
    230         try:
    231             self._acquire_restore = lock._acquire_restore
    232         except AttributeError:
    233             pass
    234         try:
    235             self._is_owned = lock._is_owned
    236         except AttributeError:
    237             pass
    238         self._waiters = _deque()
    239 
    240     def __enter__(self):
    241         return self._lock.__enter__()
    242 
    243     def __exit__(self, *args):
    244         return self._lock.__exit__(*args)
    245 
    246     def __repr__(self):
    247         return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
    248 
    249     def _release_save(self):
    250         self._lock.release()           # No state to save
    251 
    252     def _acquire_restore(self, x):
    253         self._lock.acquire()           # Ignore saved state
    254 
    255     def _is_owned(self):
    256         # Return True if lock is owned by current_thread.
    257         # This method is called only if _lock doesn't have _is_owned().
    258         if self._lock.acquire(0):
    259             self._lock.release()
    260             return False
    261         else:
    262             return True
    263 
    264     def wait(self, timeout=None):
    265         """Wait until notified or until a timeout occurs.
    266 
    267         If the calling thread has not acquired the lock when this method is
    268         called, a RuntimeError is raised.
    269 
    270         This method releases the underlying lock, and then blocks until it is
    271         awakened by a notify() or notify_all() call for the same condition
    272         variable in another thread, or until the optional timeout occurs. Once
    273         awakened or timed out, it re-acquires the lock and returns.
    274 
    275         When the timeout argument is present and not None, it should be a
    276         floating point number specifying a timeout for the operation in seconds
    277         (or fractions thereof).
    278 
    279         When the underlying lock is an RLock, it is not released using its
    280         release() method, since this may not actually unlock the lock when it
    281         was acquired multiple times recursively. Instead, an internal interface
    282         of the RLock class is used, which really unlocks it even when it has
    283         been recursively acquired several times. Another internal interface is
    284         then used to restore the recursion level when the lock is reacquired.
    285 
    286         """
    287         if not self._is_owned():
    288             raise RuntimeError("cannot wait on un-acquired lock")
    289         waiter = _allocate_lock()
    290         waiter.acquire()
    291         self._waiters.append(waiter)
    292         saved_state = self._release_save()
    293         gotit = False
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
    296                 waiter.acquire()
    297                 gotit = True
    298             else:
    299                 if timeout > 0:
    300                     gotit = waiter.acquire(True, timeout)
    301                 else:
    302                     gotit = waiter.acquire(False)
    303             return gotit
    304         finally:
    305             self._acquire_restore(saved_state)
    306             if not gotit:
    307                 try:
    308                     self._waiters.remove(waiter)
    309                 except ValueError:
    310                     pass
    311 
    312     def wait_for(self, predicate, timeout=None):
    313         """Wait until a condition evaluates to True.
    314 
    315         predicate should be a callable which result will be interpreted as a
    316         boolean value.  A timeout may be provided giving the maximum time to
    317         wait.
    318 
    319         """
    320         endtime = None
    321         waittime = timeout
    322         result = predicate()
    323         while not result:
    324             if waittime is not None:
    325                 if endtime is None:
    326                     endtime = _time() + waittime
    327                 else:
    328                     waittime = endtime - _time()
    329                     if waittime <= 0:
    330                         break
    331             self.wait(waittime)
    332             result = predicate()
    333         return result
    334 
    335     def notify(self, n=1):
    336         """Wake up one or more threads waiting on this condition, if any.
    337 
    338         If the calling thread has not acquired the lock when this method is
    339         called, a RuntimeError is raised.
    340 
    341         This method wakes up at most n of the threads waiting for the condition
    342         variable; it is a no-op if no threads are waiting.
    343 
    344         """
    345         if not self._is_owned():
    346             raise RuntimeError("cannot notify on un-acquired lock")
    347         all_waiters = self._waiters
    348         waiters_to_notify = _deque(_islice(all_waiters, n))
    349         if not waiters_to_notify:
    350             return
    351         for waiter in waiters_to_notify:
    352             waiter.release()
    353             try:
    354                 all_waiters.remove(waiter)
    355             except ValueError:
    356                 pass
    357 
    358     def notify_all(self):
    359         """Wake up all threads waiting on this condition.
    360 
    361         If the calling thread has not acquired the lock when this method
    362         is called, a RuntimeError is raised.
    363 
    364         """
    365         self.notify(len(self._waiters))
    366 
    367     notifyAll = notify_all
    368 
    369 
    370 class Semaphore:
    371     """This class implements semaphore objects.
    372 
    373     Semaphores manage a counter representing the number of release() calls minus
    374     the number of acquire() calls, plus an initial value. The acquire() method
    375     blocks if necessary until it can return without making the counter
    376     negative. If not given, value defaults to 1.
    377 
    378     """
    379 
    380     # After Tim Peters' semaphore class, but not quite the same (no maximum)
    381 
    382     def __init__(self, value=1):
    383         if value < 0:
    384             raise ValueError("semaphore initial value must be >= 0")
    385         self._cond = Condition(Lock())
    386         self._value = value
    387 
    388     def acquire(self, blocking=True, timeout=None):
    389         """Acquire a semaphore, decrementing the internal counter by one.
    390 
    391         When invoked without arguments: if the internal counter is larger than
    392         zero on entry, decrement it by one and return immediately. If it is zero
    393         on entry, block, waiting until some other thread has called release() to
    394         make it larger than zero. This is done with proper interlocking so that
    395         if multiple acquire() calls are blocked, release() will wake exactly one
    396         of them up. The implementation may pick one at random, so the order in
    397         which blocked threads are awakened should not be relied on. There is no
    398         return value in this case.
    399 
    400         When invoked with blocking set to true, do the same thing as when called
    401         without arguments, and return true.
    402 
    403         When invoked with blocking set to false, do not block. If a call without
    404         an argument would block, return false immediately; otherwise, do the
    405         same thing as when called without arguments, and return true.
    406 
    407         When invoked with a timeout other than None, it will block for at
    408         most timeout seconds.  If acquire does not complete successfully in
    409         that interval, return false.  Return true otherwise.
    410 
    411         """
    412         if not blocking and timeout is not None:
    413             raise ValueError("can't specify timeout for non-blocking acquire")
    414         rc = False
    415         endtime = None
    416         with self._cond:
    417             while self._value == 0:
    418                 if not blocking:
    419                     break
    420                 if timeout is not None:
    421                     if endtime is None:
    422                         endtime = _time() + timeout
    423                     else:
    424                         timeout = endtime - _time()
    425                         if timeout <= 0:
    426                             break
    427                 self._cond.wait(timeout)
    428             else:
    429                 self._value -= 1
    430                 rc = True
    431         return rc
    432 
    433     __enter__ = acquire
    434 
    435     def release(self):
    436         """Release a semaphore, incrementing the internal counter by one.
    437 
    438         When the counter is zero on entry and another thread is waiting for it
    439         to become larger than zero again, wake up that thread.
    440 
    441         """
    442         with self._cond:
    443             self._value += 1
    444             self._cond.notify()
    445 
    446     def __exit__(self, t, v, tb):
    447         self.release()
    448 
    449 
    450 class BoundedSemaphore(Semaphore):
    451     """Implements a bounded semaphore.
    452 
    453     A bounded semaphore checks to make sure its current value doesn't exceed its
    454     initial value. If it does, ValueError is raised. In most situations
    455     semaphores are used to guard resources with limited capacity.
    456 
    457     If the semaphore is released too many times it's a sign of a bug. If not
    458     given, value defaults to 1.
    459 
    460     Like regular semaphores, bounded semaphores manage a counter representing
    461     the number of release() calls minus the number of acquire() calls, plus an
    462     initial value. The acquire() method blocks if necessary until it can return
    463     without making the counter negative. If not given, value defaults to 1.
    464 
    465     """
    466 
    467     def __init__(self, value=1):
    468         Semaphore.__init__(self, value)
    469         self._initial_value = value
    470 
    471     def release(self):
    472         """Release a semaphore, incrementing the internal counter by one.
    473 
    474         When the counter is zero on entry and another thread is waiting for it
    475         to become larger than zero again, wake up that thread.
    476 
    477         If the number of releases exceeds the number of acquires,
    478         raise a ValueError.
    479 
    480         """
    481         with self._cond:
    482             if self._value >= self._initial_value:
    483                 raise ValueError("Semaphore released too many times")
    484             self._value += 1
    485             self._cond.notify()
    486 
    487 
    488 class Event:
    489     """Class implementing event objects.
    490 
    491     Events manage a flag that can be set to true with the set() method and reset
    492     to false with the clear() method. The wait() method blocks until the flag is
    493     true.  The flag is initially false.
    494 
    495     """
    496 
    497     # After Tim Peters' event class (without is_posted())
    498 
    499     def __init__(self):
    500         self._cond = Condition(Lock())
    501         self._flag = False
    502 
    503     def _reset_internal_locks(self):
    504         # private!  called by Thread._reset_internal_locks by _after_fork()
    505         self._cond.__init__(Lock())
    506 
    507     def is_set(self):
    508         """Return true if and only if the internal flag is true."""
    509         return self._flag
    510 
    511     isSet = is_set
    512 
    513     def set(self):
    514         """Set the internal flag to true.
    515 
    516         All threads waiting for it to become true are awakened. Threads
    517         that call wait() once the flag is true will not block at all.
    518 
    519         """
    520         with self._cond:
    521             self._flag = True
    522             self._cond.notify_all()
    523 
    524     def clear(self):
    525         """Reset the internal flag to false.
    526 
    527         Subsequently, threads calling wait() will block until set() is called to
    528         set the internal flag to true again.
    529 
    530         """
    531         with self._cond:
    532             self._flag = False
    533 
    534     def wait(self, timeout=None):
    535         """Block until the internal flag is true.
    536 
    537         If the internal flag is true on entry, return immediately. Otherwise,
    538         block until another thread calls set() to set the flag to true, or until
    539         the optional timeout occurs.
    540 
    541         When the timeout argument is present and not None, it should be a
    542         floating point number specifying a timeout for the operation in seconds
    543         (or fractions thereof).
    544 
    545         This method returns the internal flag on exit, so it will always return
    546         True except if a timeout is given and the operation times out.
    547 
    548         """
    549         with self._cond:
    550             signaled = self._flag
    551             if not signaled:
    552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 
    555 
    556 # A barrier class.  Inspired in part by the pthread_barrier_* api and
    557 # the CyclicBarrier class from Java.  See
    558 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
    559 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
    560 #        CyclicBarrier.html
    561 # for information.
    562 # We maintain two main states, 'filling' and 'draining' enabling the barrier
    563 # to be cyclic.  Threads are not allowed into it until it has fully drained
    564 # since the previous cycle.  In addition, a 'resetting' state exists which is
    565 # similar to 'draining' except that threads leave with a BrokenBarrierError,
    566 # and a 'broken' state in which all threads get the exception.
    567 class Barrier:
    568     """Implements a Barrier.
    569 
    570     Useful for synchronizing a fixed number of threads at known synchronization
    571     points.  Threads block on 'wait()' and are simultaneously awoken once they
    572     have all made that call.
    573 
    574     """
    575 
    576     def __init__(self, parties, action=None, timeout=None):
    577         """Create a barrier, initialised to 'parties' threads.
    578 
    579         'action' is a callable which, when supplied, will be called by one of
    580         the threads after they have all entered the barrier and just prior to
    581         releasing them all. If a 'timeout' is provided, it is used as the
    582         default for all subsequent 'wait()' calls.
    583 
    584         """
    585         self._cond = Condition(Lock())
    586         self._action = action
    587         self._timeout = timeout
    588         self._parties = parties
    589         self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
    590         self._count = 0
    591 
    592     def wait(self, timeout=None):
    593         """Wait for the barrier.
    594 
    595         When the specified number of threads have started waiting, they are all
    596         simultaneously awoken. If an 'action' was provided for the barrier, one
    597         of the threads will have executed that callback prior to returning.
    598         Returns an individual index number from 0 to 'parties-1'.
    599 
    600         """
    601         if timeout is None:
    602             timeout = self._timeout
    603         with self._cond:
    604             self._enter() # Block while the barrier drains.
    605             index = self._count
    606             self._count += 1
    607             try:
    608                 if index + 1 == self._parties:
    609                     # We release the barrier
    610                     self._release()
    611                 else:
    612                     # We wait until someone releases us
    613                     self._wait(timeout)
    614                 return index
    615             finally:
    616                 self._count -= 1
    617                 # Wake up any threads waiting for barrier to drain.
    618                 self._exit()
    619 
    620     # Block until the barrier is ready for us, or raise an exception
    621     # if it is broken.
    622     def _enter(self):
    623         while self._state in (-1, 1):
    624             # It is draining or resetting, wait until done
    625             self._cond.wait()
    626         #see if the barrier is in a broken state
    627         if self._state < 0:
    628             raise BrokenBarrierError
    629         assert self._state == 0
    630 
    631     # Optionally run the 'action' and release the threads waiting
    632     # in the barrier.
    633     def _release(self):
    634         try:
    635             if self._action:
    636                 self._action()
    637             # enter draining state
    638             self._state = 1
    639             self._cond.notify_all()
    640         except:
    641             #an exception during the _action handler.  Break and reraise
    642             self._break()
    643             raise
    644 
    645     # Wait in the barrier until we are released.  Raise an exception
    646     # if the barrier is reset or broken.
    647     def _wait(self, timeout):
    648         if not self._cond.wait_for(lambda : self._state != 0, timeout):
    649             #timed out.  Break the barrier
    650             self._break()
    651             raise BrokenBarrierError
    652         if self._state < 0:
    653             raise BrokenBarrierError
    654         assert self._state == 1
    655 
    656     # If we are the last thread to exit the barrier, signal any threads
    657     # waiting for the barrier to drain.
    658     def _exit(self):
    659         if self._count == 0:
    660             if self._state in (-1, 1):
    661                 #resetting or draining
    662                 self._state = 0
    663                 self._cond.notify_all()
    664 
    665     def reset(self):
    666         """Reset the barrier to the initial state.
    667 
    668         Any threads currently waiting will get the BrokenBarrier exception
    669         raised.
    670 
    671         """
    672         with self._cond:
    673             if self._count > 0:
    674                 if self._state == 0:
    675                     #reset the barrier, waking up threads
    676                     self._state = -1
    677                 elif self._state == -2:
    678                     #was broken, set it to reset state
    679                     #which clears when the last thread exits
    680                     self._state = -1
    681             else:
    682                 self._state = 0
    683             self._cond.notify_all()
    684 
    685     def abort(self):
    686         """Place the barrier into a 'broken' state.
    687 
    688         Useful in case of error.  Any currently waiting threads and threads
    689         attempting to 'wait()' will have BrokenBarrierError raised.
    690 
    691         """
    692         with self._cond:
    693             self._break()
    694 
    695     def _break(self):
    696         # An internal error was detected.  The barrier is set to
    697         # a broken state all parties awakened.
    698         self._state = -2
    699         self._cond.notify_all()
    700 
    701     @property
    702     def parties(self):
    703         """Return the number of threads required to trip the barrier."""
    704         return self._parties
    705 
    706     @property
    707     def n_waiting(self):
    708         """Return the number of threads currently waiting at the barrier."""
    709         # We don't need synchronization here since this is an ephemeral result
    710         # anyway.  It returns the correct value in the steady state.
    711         if self._state == 0:
    712             return self._count
    713         return 0
    714 
    715     @property
    716     def broken(self):
    717         """Return True if the barrier is in a broken state."""
    718         return self._state == -2
    719 
    720 # exception raised by the Barrier class
    721 class BrokenBarrierError(RuntimeError):
    722     pass
    723 
    724 
    725 # Helper to generate new thread names
    726 _counter = _count().__next__
    727 _counter() # Consume 0 so first non-main thread has id 1.
    728 def _newname(template="Thread-%d"):
    729     return template % _counter()
    730 
    731 # Active thread administration
    732 _active_limbo_lock = _allocate_lock()
    733 _active = {}    # maps thread id to Thread object
    734 _limbo = {}
    735 _dangling = WeakSet()
    736 
    737 # Main class for threads
    738 
    739 class Thread:
    740     """A class that represents a thread of control.
    741 
    742     This class can be safely subclassed in a limited fashion. There are two ways
    743     to specify the activity: by passing a callable object to the constructor, or
    744     by overriding the run() method in a subclass.
    745 
    746     """
    747 
    748     _initialized = False
    749     # Need to store a reference to sys.exc_info for printing
    750     # out exceptions when a thread tries to use a global var. during interp.
    751     # shutdown and thus raises an exception about trying to perform some
    752     # operation on/with a NoneType
    753     _exc_info = _sys.exc_info
    754     # Keep sys.exc_clear too to clear the exception just before
    755     # allowing .join() to return.
    756     #XXX __exc_clear = _sys.exc_clear
    757 
    758     def __init__(self, group=None, target=None, name=None,
    759                  args=(), kwargs=None, *, daemon=None):
    760         """This constructor should always be called with keyword arguments. Arguments are:
    761 
    762         *group* should be None; reserved for future extension when a ThreadGroup
    763         class is implemented.
    764 
    765         *target* is the callable object to be invoked by the run()
    766         method. Defaults to None, meaning nothing is called.
    767 
    768         *name* is the thread name. By default, a unique name is constructed of
    769         the form "Thread-N" where N is a small decimal number.
    770 
    771         *args* is the argument tuple for the target invocation. Defaults to ().
    772 
    773         *kwargs* is a dictionary of keyword arguments for the target
    774         invocation. Defaults to {}.
    775 
    776         If a subclass overrides the constructor, it must make sure to invoke
    777         the base class constructor (Thread.__init__()) before doing anything
    778         else to the thread.
    779 
    780         """
    781         assert group is None, "group argument must be None for now"
    782         if kwargs is None:
    783             kwargs = {}
    784         self._target = target
    785         self._name = str(name or _newname())
    786         self._args = args
    787         self._kwargs = kwargs
    788         if daemon is not None:
    789             self._daemonic = daemon
    790         else:
    791             self._daemonic = current_thread().daemon
    792         self._ident = None
    793         self._tstate_lock = None
    794         self._started = Event()
    795         self._is_stopped = False
    796         self._initialized = True
    797         # sys.stderr is not stored in the class like
    798         # sys.exc_info since it can be changed between instances
    799         self._stderr = _sys.stderr
    800         # For debugging and _after_fork()
    801         _dangling.add(self)
    802 
    803     def _reset_internal_locks(self, is_alive):
    804         # private!  Called by _after_fork() to reset our internal locks as
    805         # they may be in an invalid state leading to a deadlock or crash.
    806         self._started._reset_internal_locks()
    807         if is_alive:
    808             self._set_tstate_lock()
    809         else:
    810             # The thread isn't alive after fork: it doesn't have a tstate
    811             # anymore.
    812             self._is_stopped = True
    813             self._tstate_lock = None
    814 
    815     def __repr__(self):
    816         assert self._initialized, "Thread.__init__() was not called"
    817         status = "initial"
    818         if self._started.is_set():
    819             status = "started"
    820         self.is_alive() # easy way to get ._is_stopped set when appropriate
    821         if self._is_stopped:
    822             status = "stopped"
    823         if self._daemonic:
    824             status += " daemon"
    825         if self._ident is not None:
    826             status += " %s" % self._ident
    827         return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
    828 
    829     def start(self):
    830         """Start the thread's activity.
    831 
    832         It must be called at most once per thread object. It arranges for the
    833         object's run() method to be invoked in a separate thread of control.
    834 
    835         This method will raise a RuntimeError if called more than once on the
    836         same thread object.
    837 
    838         """
    839         if not self._initialized:
    840             raise RuntimeError("thread.__init__() not called")
    841 
    842         if self._started.is_set():
    843             raise RuntimeError("threads can only be started once")
    844         with _active_limbo_lock:
    845             _limbo[self] = self
    846         try:
    847             _start_new_thread(self._bootstrap, ())
    848         except Exception:
    849             with _active_limbo_lock:
    850                 del _limbo[self]
    851             raise
    852         self._started.wait()
    853 
    854     def run(self):
    855         """Method representing the thread's activity.
    856 
    857         You may override this method in a subclass. The standard run() method
    858         invokes the callable object passed to the object's constructor as the
    859         target argument, if any, with sequential and keyword arguments taken
    860         from the args and kwargs arguments, respectively.
    861 
    862         """
    863         try:
    864             if self._target:
    865                 self._target(*self._args, **self._kwargs)
    866         finally:
    867             # Avoid a refcycle if the thread is running a function with
    868             # an argument that has a member that points to the thread.
    869             del self._target, self._args, self._kwargs
    870 
    871     def _bootstrap(self):
    872         # Wrapper around the real bootstrap code that ignores
    873         # exceptions during interpreter cleanup.  Those typically
    874         # happen when a daemon thread wakes up at an unfortunate
    875         # moment, finds the world around it destroyed, and raises some
    876         # random exception *** while trying to report the exception in
    877         # _bootstrap_inner() below ***.  Those random exceptions
    878         # don't help anybody, and they confuse users, so we suppress
    879         # them.  We suppress them only when it appears that the world
    880         # indeed has already been destroyed, so that exceptions in
    881         # _bootstrap_inner() during normal business hours are properly
    882         # reported.  Also, we only suppress them for daemonic threads;
    883         # if a non-daemonic encounters this, something else is wrong.
    884         try:
    885             self._bootstrap_inner()
    886         except:
    887             if self._daemonic and _sys is None:
    888                 return
    889             raise
    890 
    891     def _set_ident(self):
    892         self._ident = get_ident()
    893 
    894     def _set_tstate_lock(self):
    895         """
    896         Set a lock object which will be released by the interpreter when
    897         the underlying thread state (see pystate.h) gets deleted.
    898         """
    899         self._tstate_lock = _set_sentinel()
    900         self._tstate_lock.acquire()
    901 
    902     def _bootstrap_inner(self):
    903         try:
    904             self._set_ident()
    905             self._set_tstate_lock()
    906             self._started.set()
    907             with _active_limbo_lock:
    908                 _active[self._ident] = self
    909                 del _limbo[self]
    910 
    911             if _trace_hook:
    912                 _sys.settrace(_trace_hook)
    913             if _profile_hook:
    914                 _sys.setprofile(_profile_hook)
    915 
    916             try:
    917                 self.run()
    918             except SystemExit:
    919                 pass
    920             except:
    921                 # If sys.stderr is no more (most likely from interpreter
    922                 # shutdown) use self._stderr.  Otherwise still use sys (as in
    923                 # _sys) in case sys.stderr was redefined since the creation of
    924                 # self.
    925                 if _sys and _sys.stderr is not None:
    926                     print("Exception in thread %s:\n%s" %
    927                           (self.name, _format_exc()), file=_sys.stderr)
    928                 elif self._stderr is not None:
    929                     # Do the best job possible w/o a huge amt. of code to
    930                     # approximate a traceback (code ideas from
    931                     # Lib/traceback.py)
    932                     exc_type, exc_value, exc_tb = self._exc_info()
    933                     try:
    934                         print((
    935                             "Exception in thread " + self.name +
    936                             " (most likely raised during interpreter shutdown):"), file=self._stderr)
    937                         print((
    938                             "Traceback (most recent call last):"), file=self._stderr)
    939                         while exc_tb:
    940                             print((
    941                                 '  File "%s", line %s, in %s' %
    942                                 (exc_tb.tb_frame.f_code.co_filename,
    943                                     exc_tb.tb_lineno,
    944                                     exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
    945                             exc_tb = exc_tb.tb_next
    946                         print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
    947                         self._stderr.flush()
    948                     # Make sure that exc_tb gets deleted since it is a memory
    949                     # hog; deleting everything else is just for thoroughness
    950                     finally:
    951                         del exc_type, exc_value, exc_tb
    952             finally:
    953                 # Prevent a race in
    954                 # test_threading.test_no_refcycle_through_target when
    955                 # the exception keeps the target alive past when we
    956                 # assert that it's dead.
    957                 #XXX self._exc_clear()
    958                 pass
    959         finally:
    960             with _active_limbo_lock:
    961                 try:
    962                     # We don't call self._delete() because it also
    963                     # grabs _active_limbo_lock.
    964                     del _active[get_ident()]
    965                 except:
    966                     pass
    967 
    968     def _stop(self):
    969         # After calling ._stop(), .is_alive() returns False and .join() returns
    970         # immediately.  ._tstate_lock must be released before calling ._stop().
    971         #
    972         # Normal case:  C code at the end of the thread's life
    973         # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
    974         # that's detected by our ._wait_for_tstate_lock(), called by .join()
    975         # and .is_alive().  Any number of threads _may_ call ._stop()
    976         # simultaneously (for example, if multiple threads are blocked in
    977         # .join() calls), and they're not serialized.  That's harmless -
    978         # they'll just make redundant rebindings of ._is_stopped and
    979         # ._tstate_lock.  Obscure:  we rebind ._tstate_lock last so that the
    980         # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
    981         # (the assert is executed only if ._tstate_lock is None).
    982         #
    983         # Special case:  _main_thread releases ._tstate_lock via this
    984         # module's _shutdown() function.
    985         lock = self._tstate_lock
    986         if lock is not None:
    987             assert not lock.locked()
    988         self._is_stopped = True
    989         self._tstate_lock = None
    990 
    991     def _delete(self):
    992         "Remove current thread from the dict of currently running threads."
    993         with _active_limbo_lock:
    994             del _active[get_ident()]
    995             # There must not be any python code between the previous line
    996             # and after the lock is released.  Otherwise a tracing function
    997             # could try to acquire the lock again in the same thread, (in
    998             # current_thread()), and would block.
    999 
   1000     def join(self, timeout=None):
   1001         """Wait until the thread terminates.
   1002 
   1003         This blocks the calling thread until the thread whose join() method is
   1004         called terminates -- either normally or through an unhandled exception
   1005         or until the optional timeout occurs.
   1006 
   1007         When the timeout argument is present and not None, it should be a
   1008         floating point number specifying a timeout for the operation in seconds
   1009         (or fractions thereof). As join() always returns None, you must call
   1010         is_alive() after join() to decide whether a timeout happened -- if the
   1011         thread is still alive, the join() call timed out.
   1012 
   1013         When the timeout argument is not present or None, the operation will
   1014         block until the thread terminates.
   1015 
   1016         A thread can be join()ed many times.
   1017 
   1018         join() raises a RuntimeError if an attempt is made to join the current
   1019         thread as that would cause a deadlock. It is also an error to join() a
   1020         thread before it has been started and attempts to do so raises the same
   1021         exception.
   1022 
   1023         """
   1024         if not self._initialized:
   1025             raise RuntimeError("Thread.__init__() not called")
   1026         if not self._started.is_set():
   1027             raise RuntimeError("cannot join thread before it is started")
   1028         if self is current_thread():
   1029             raise RuntimeError("cannot join current thread")
   1030 
   1031         if timeout is None:
   1032             self._wait_for_tstate_lock()
   1033         else:
   1034             # the behavior of a negative timeout isn't documented, but
   1035             # historically .join(timeout=x) for x<0 has acted as if timeout=0
   1036             self._wait_for_tstate_lock(timeout=max(timeout, 0))
   1037 
   1038     def _wait_for_tstate_lock(self, block=True, timeout=-1):
   1039         # Issue #18808: wait for the thread state to be gone.
   1040         # At the end of the thread's life, after all knowledge of the thread
   1041         # is removed from C data structures, C code releases our _tstate_lock.
   1042         # This method passes its arguments to _tstate_lock.acquire().
   1043         # If the lock is acquired, the C code is done, and self._stop() is
   1044         # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
   1045         lock = self._tstate_lock
   1046         if lock is None:  # already determined that the C code is done
   1047             assert self._is_stopped
   1048         elif lock.acquire(block, timeout):
   1049             lock.release()
   1050             self._stop()
   1051 
   1052     @property
   1053     def name(self):
   1054         """A string used for identification purposes only.
   1055 
   1056         It has no semantics. Multiple threads may be given the same name. The
   1057         initial name is set by the constructor.
   1058 
   1059         """
   1060         assert self._initialized, "Thread.__init__() not called"
   1061         return self._name
   1062 
   1063     @name.setter
   1064     def name(self, name):
   1065         assert self._initialized, "Thread.__init__() not called"
   1066         self._name = str(name)
   1067 
   1068     @property
   1069     def ident(self):
   1070         """Thread identifier of this thread or None if it has not been started.
   1071 
   1072         This is a nonzero integer. See the get_ident() function. Thread
   1073         identifiers may be recycled when a thread exits and another thread is
   1074         created. The identifier is available even after the thread has exited.
   1075 
   1076         """
   1077         assert self._initialized, "Thread.__init__() not called"
   1078         return self._ident
   1079 
   1080     def is_alive(self):
   1081         """Return whether the thread is alive.
   1082 
   1083         This method returns True just before the run() method starts until just
   1084         after the run() method terminates. The module function enumerate()
   1085         returns a list of all alive threads.
   1086 
   1087         """
   1088         assert self._initialized, "Thread.__init__() not called"
   1089         if self._is_stopped or not self._started.is_set():
   1090             return False
   1091         self._wait_for_tstate_lock(False)
   1092         return not self._is_stopped
   1093 
   1094     def isAlive(self):
   1095         """Return whether the thread is alive.
   1096 
   1097         This method is deprecated, use is_alive() instead.
   1098         """
   1099         import warnings
   1100         warnings.warn('isAlive() is deprecated, use is_alive() instead',
   1101                       PendingDeprecationWarning, stacklevel=2)
   1102         return self.is_alive()
   1103 
   1104     @property
   1105     def daemon(self):
   1106         """A boolean value indicating whether this thread is a daemon thread.
   1107 
   1108         This must be set before start() is called, otherwise RuntimeError is
   1109         raised. Its initial value is inherited from the creating thread; the
   1110         main thread is not a daemon thread and therefore all threads created in
   1111         the main thread default to daemon = False.
   1112 
   1113         The entire Python program exits when no alive non-daemon threads are
   1114         left.
   1115 
   1116         """
   1117         assert self._initialized, "Thread.__init__() not called"
   1118         return self._daemonic
   1119 
   1120     @daemon.setter
   1121     def daemon(self, daemonic):
   1122         if not self._initialized:
   1123             raise RuntimeError("Thread.__init__() not called")
   1124         if self._started.is_set():
   1125             raise RuntimeError("cannot set daemon status of active thread")
   1126         self._daemonic = daemonic
   1127 
   1128     def isDaemon(self):
   1129         return self.daemon
   1130 
   1131     def setDaemon(self, daemonic):
   1132         self.daemon = daemonic
   1133 
   1134     def getName(self):
   1135         return self.name
   1136 
   1137     def setName(self, name):
   1138         self.name = name
   1139 
   1140 # The timer class was contributed by Itamar Shtull-Trauring
   1141 
   1142 class Timer(Thread):
   1143     """Call a function after a specified number of seconds:
   1144 
   1145             t = Timer(30.0, f, args=None, kwargs=None)
   1146             t.start()
   1147             t.cancel()     # stop the timer's action if it's still waiting
   1148 
   1149     """
   1150 
   1151     def __init__(self, interval, function, args=None, kwargs=None):
   1152         Thread.__init__(self)
   1153         self.interval = interval
   1154         self.function = function
   1155         self.args = args if args is not None else []
   1156         self.kwargs = kwargs if kwargs is not None else {}
   1157         self.finished = Event()
   1158 
   1159     def cancel(self):
   1160         """Stop the timer if it hasn't finished yet."""
   1161         self.finished.set()
   1162 
   1163     def run(self):
   1164         self.finished.wait(self.interval)
   1165         if not self.finished.is_set():
   1166             self.function(*self.args, **self.kwargs)
   1167         self.finished.set()
   1168 
   1169 
   1170 # Special thread class to represent the main thread
   1171 
   1172 class _MainThread(Thread):
   1173 
   1174     def __init__(self):
   1175         Thread.__init__(self, name="MainThread", daemon=False)
   1176         self._set_tstate_lock()
   1177         self._started.set()
   1178         self._set_ident()
   1179         with _active_limbo_lock:
   1180             _active[self._ident] = self
   1181 
   1182 
   1183 # Dummy thread class to represent threads not started here.
   1184 # These aren't garbage collected when they die, nor can they be waited for.
   1185 # If they invoke anything in threading.py that calls current_thread(), they
   1186 # leave an entry in the _active dict forever after.
   1187 # Their purpose is to return *something* from current_thread().
   1188 # They are marked as daemon threads so we won't wait for them
   1189 # when we exit (conform previous semantics).
   1190 
   1191 class _DummyThread(Thread):
   1192 
   1193     def __init__(self):
   1194         Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
   1195 
   1196         self._started.set()
   1197         self._set_ident()
   1198         with _active_limbo_lock:
   1199             _active[self._ident] = self
   1200 
   1201     def _stop(self):
   1202         pass
   1203 
   1204     def is_alive(self):
   1205         assert not self._is_stopped and self._started.is_set()
   1206         return True
   1207 
   1208     def join(self, timeout=None):
   1209         assert False, "cannot join a dummy thread"
   1210 
   1211 
   1212 # Global API functions
   1213 
   1214 def current_thread():
   1215     """Return the current Thread object, corresponding to the caller's thread of control.
   1216 
   1217     If the caller's thread of control was not created through the threading
   1218     module, a dummy thread object with limited functionality is returned.
   1219 
   1220     """
   1221     try:
   1222         return _active[get_ident()]
   1223     except KeyError:
   1224         return _DummyThread()
   1225 
   1226 currentThread = current_thread
   1227 
   1228 def active_count():
   1229     """Return the number of Thread objects currently alive.
   1230 
   1231     The returned count is equal to the length of the list returned by
   1232     enumerate().
   1233 
   1234     """
   1235     with _active_limbo_lock:
   1236         return len(_active) + len(_limbo)
   1237 
   1238 activeCount = active_count
   1239 
   1240 def _enumerate():
   1241     # Same as enumerate(), but without the lock. Internal use only.
   1242     return list(_active.values()) + list(_limbo.values())
   1243 
   1244 def enumerate():
   1245     """Return a list of all Thread objects currently alive.
   1246 
   1247     The list includes daemonic threads, dummy thread objects created by
   1248     current_thread(), and the main thread. It excludes terminated threads and
   1249     threads that have not yet been started.
   1250 
   1251     """
   1252     with _active_limbo_lock:
   1253         return list(_active.values()) + list(_limbo.values())
   1254 
   1255 from _thread import stack_size
   1256 
   1257 # Create the main thread object,
   1258 # and make it available for the interpreter
   1259 # (Py_Main) as threading._shutdown.
   1260 
   1261 _main_thread = _MainThread()
   1262 
   1263 def _shutdown():
   1264     # Obscure:  other threads may be waiting to join _main_thread.  That's
   1265     # dubious, but some code does it.  We can't wait for C code to release
   1266     # the main thread's tstate_lock - that won't happen until the interpreter
   1267     # is nearly dead.  So we release it here.  Note that just calling _stop()
   1268     # isn't enough:  other threads may already be waiting on _tstate_lock.
   1269     if _main_thread._is_stopped:
   1270         # _shutdown() was already called
   1271         return
   1272     tlock = _main_thread._tstate_lock
   1273     # The main thread isn't finished yet, so its thread state lock can't have
   1274     # been released.
   1275     assert tlock is not None
   1276     assert tlock.locked()
   1277     tlock.release()
   1278     _main_thread._stop()
   1279     t = _pickSomeNonDaemonThread()
   1280     while t:
   1281         t.join()
   1282         t = _pickSomeNonDaemonThread()
   1283 
   1284 def _pickSomeNonDaemonThread():
   1285     for t in enumerate():
   1286         if not t.daemon and t.is_alive():
   1287             return t
   1288     return None
   1289 
   1290 def main_thread():
   1291     """Return the main thread object.
   1292 
   1293     In normal conditions, the main thread is the thread from which the
   1294     Python interpreter was started.
   1295     """
   1296     return _main_thread
   1297 
   1298 # get thread-local implementation, either from the thread
   1299 # module, or from the python fallback
   1300 
   1301 try:
   1302     from _thread import _local as local
   1303 except ImportError:
   1304     from _threading_local import local
   1305 
   1306 
   1307 def _after_fork():
   1308     """
   1309     Cleanup threading module state that should not exist after a fork.
   1310     """
   1311     # Reset _active_limbo_lock, in case we forked while the lock was held
   1312     # by another (non-forked) thread.  http://bugs.python.org/issue874900
   1313     global _active_limbo_lock, _main_thread
   1314     _active_limbo_lock = _allocate_lock()
   1315 
   1316     # fork() only copied the current thread; clear references to others.
   1317     new_active = {}
   1318     current = current_thread()
   1319     _main_thread = current
   1320     with _active_limbo_lock:
   1321         # Dangling thread instances must still have their locks reset,
   1322         # because someone may join() them.
   1323         threads = set(_enumerate())
   1324         threads.update(_dangling)
   1325         for thread in threads:
   1326             # Any lock/condition variable may be currently locked or in an
   1327             # invalid state, so we reinitialize them.
   1328             if thread is current:
   1329                 # There is only one active thread. We reset the ident to
   1330                 # its new value since it can have changed.
   1331                 thread._reset_internal_locks(True)
   1332                 ident = get_ident()
   1333                 thread._ident = ident
   1334                 new_active[ident] = thread
   1335             else:
   1336                 # All the others are already stopped.
   1337                 thread._reset_internal_locks(False)
   1338                 thread._stop()
   1339 
   1340         _limbo.clear()
   1341         _active.clear()
   1342         _active.update(new_active)
   1343         assert len(_active) == 1
   1344 
   1345 
   1346 if hasattr(_os, "register_at_fork"):
   1347     _os.register_at_fork(after_in_child=_after_fork)
   1348