Home | History | Annotate | Download | only in python2.7
      1 """Thread module emulating a subset of Java's threading model."""
      2 
      3 import sys as _sys
      4 
      5 try:
      6     import thread
      7 except ImportError:
      8     del _sys.modules[__name__]
      9     raise
     10 
     11 import warnings
     12 
     13 from collections import deque as _deque
     14 from time import time as _time, sleep as _sleep
     15 from traceback import format_exc as _format_exc
     16 
     17 # Note regarding PEP 8 compliant aliases
     18 #  This threading model was originally inspired by Java, and inherited
     19 # the convention of camelCase function and method names from that
     20 # language. While those names are not in any imminent danger of being
     21 # deprecated, starting with Python 2.6, the module now provides a
     22 # PEP 8 compliant alias for any such method name.
     23 # Using the new PEP 8 compliant names also facilitates substitution
     24 # with the multiprocessing module, which doesn't provide the old
     25 # Java inspired names.
     26 
     27 
     28 # Rename some stuff so "from threading import *" is safe
     29 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
     30            'current_thread', 'enumerate', 'Event',
     31            'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
     32            'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
     33 
     34 _start_new_thread = thread.start_new_thread
     35 _allocate_lock = thread.allocate_lock
     36 _get_ident = thread.get_ident
     37 ThreadError = thread.error
     38 del thread
     39 
     40 
     41 # sys.exc_clear is used to work around the fact that except blocks
     42 # don't fully clear the exception until 3.0.
     43 warnings.filterwarnings('ignore', category=DeprecationWarning,
     44                         module='threading', message='sys.exc_clear')
     45 
     46 # Debug support (adapted from ihooks.py).
     47 # All the major classes here derive from _Verbose.  We force that to
     48 # be a new-style class so that all the major classes here are new-style.
     49 # This helps debugging (type(instance) is more revealing for instances
     50 # of new-style classes).
     51 
     52 _VERBOSE = False
     53 
     54 if __debug__:
     55 
     56     class _Verbose(object):
     57 
     58         def __init__(self, verbose=None):
     59             if verbose is None:
     60                 verbose = _VERBOSE
     61             self.__verbose = verbose
     62 
     63         def _note(self, format, *args):
     64             if self.__verbose:
     65                 format = format % args
     66                 # Issue #4188: calling current_thread() can incur an infinite
     67                 # recursion if it has to create a DummyThread on the fly.
     68                 ident = _get_ident()
     69                 try:
     70                     name = _active[ident].name
     71                 except KeyError:
     72                     name = "<OS thread %d>" % ident
     73                 format = "%s: %s\n" % (name, format)
     74                 _sys.stderr.write(format)
     75 
     76 else:
     77     # Disable this when using "python -O"
     78     class _Verbose(object):
     79         def __init__(self, verbose=None):
     80             pass
     81         def _note(self, *args):
     82             pass
     83 
     84 # Support for profile and trace hooks
     85 
     86 _profile_hook = None
     87 _trace_hook = None
     88 
     89 def setprofile(func):
     90     """Set a profile function for all threads started from the threading module.
     91 
     92     The func will be passed to sys.setprofile() for each thread, before its
     93     run() method is called.
     94 
     95     """
     96     global _profile_hook
     97     _profile_hook = func
     98 
     99 def settrace(func):
    100     """Set a trace function for all threads started from the threading module.
    101 
    102     The func will be passed to sys.settrace() for each thread, before its run()
    103     method is called.
    104 
    105     """
    106     global _trace_hook
    107     _trace_hook = func
    108 
    109 # Synchronization classes
    110 
    111 Lock = _allocate_lock
    112 
    113 def RLock(*args, **kwargs):
    114     """Factory function that returns a new reentrant lock.
    115 
    116     A reentrant lock must be released by the thread that acquired it. Once a
    117     thread has acquired a reentrant lock, the same thread may acquire it again
    118     without blocking; the thread must release it once for each time it has
    119     acquired it.
    120 
    121     """
    122     return _RLock(*args, **kwargs)
    123 
    124 class _RLock(_Verbose):
    125     """A reentrant lock must be released by the thread that acquired it. Once a
    126        thread has acquired a reentrant lock, the same thread may acquire it
    127        again without blocking; the thread must release it once for each time it
    128        has acquired it.
    129     """
    130 
    131     def __init__(self, verbose=None):
    132         _Verbose.__init__(self, verbose)
    133         self.__block = _allocate_lock()
    134         self.__owner = None
    135         self.__count = 0
    136 
    137     def __repr__(self):
    138         owner = self.__owner
    139         try:
    140             owner = _active[owner].name
    141         except KeyError:
    142             pass
    143         return "<%s owner=%r count=%d>" % (
    144                 self.__class__.__name__, owner, self.__count)
    145 
    146     def acquire(self, blocking=1):
    147         """Acquire a lock, blocking or non-blocking.
    148 
    149         When invoked without arguments: if this thread already owns the lock,
    150         increment the recursion level by one, and return immediately. Otherwise,
    151         if another thread owns the lock, block until the lock is unlocked. Once
    152         the lock is unlocked (not owned by any thread), then grab ownership, set
    153         the recursion level to one, and return. If more than one thread is
    154         blocked waiting until the lock is unlocked, only one at a time will be
    155         able to grab ownership of the lock. There is no return value in this
    156         case.
    157 
    158         When invoked with the blocking argument set to true, do the same thing
    159         as when called without arguments, and return true.
    160 
    161         When invoked with the blocking argument set to false, do not block. If a
    162         call without an argument would block, return false immediately;
    163         otherwise, do the same thing as when called without arguments, and
    164         return true.
    165 
    166         """
    167         me = _get_ident()
    168         if self.__owner == me:
    169             self.__count = self.__count + 1
    170             if __debug__:
    171                 self._note("%s.acquire(%s): recursive success", self, blocking)
    172             return 1
    173         rc = self.__block.acquire(blocking)
    174         if rc:
    175             self.__owner = me
    176             self.__count = 1
    177             if __debug__:
    178                 self._note("%s.acquire(%s): initial success", self, blocking)
    179         else:
    180             if __debug__:
    181                 self._note("%s.acquire(%s): failure", self, blocking)
    182         return rc
    183 
    184     __enter__ = acquire
    185 
    186     def release(self):
    187         """Release a lock, decrementing the recursion level.
    188 
    189         If after the decrement it is zero, reset the lock to unlocked (not owned
    190         by any thread), and if any other threads are blocked waiting for the
    191         lock to become unlocked, allow exactly one of them to proceed. If after
    192         the decrement the recursion level is still nonzero, the lock remains
    193         locked and owned by the calling thread.
    194 
    195         Only call this method when the calling thread owns the lock. A
    196         RuntimeError is raised if this method is called when the lock is
    197         unlocked.
    198 
    199         There is no return value.
    200 
    201         """
    202         if self.__owner != _get_ident():
    203             raise RuntimeError("cannot release un-acquired lock")
    204         self.__count = count = self.__count - 1
    205         if not count:
    206             self.__owner = None
    207             self.__block.release()
    208             if __debug__:
    209                 self._note("%s.release(): final release", self)
    210         else:
    211             if __debug__:
    212                 self._note("%s.release(): non-final release", self)
    213 
    214     def __exit__(self, t, v, tb):
    215         self.release()
    216 
    217     # Internal methods used by condition variables
    218 
    219     def _acquire_restore(self, count_owner):
    220         count, owner = count_owner
    221         self.__block.acquire()
    222         self.__count = count
    223         self.__owner = owner
    224         if __debug__:
    225             self._note("%s._acquire_restore()", self)
    226 
    227     def _release_save(self):
    228         if __debug__:
    229             self._note("%s._release_save()", self)
    230         count = self.__count
    231         self.__count = 0
    232         owner = self.__owner
    233         self.__owner = None
    234         self.__block.release()
    235         return (count, owner)
    236 
    237     def _is_owned(self):
    238         return self.__owner == _get_ident()
    239 
    240 
    241 def Condition(*args, **kwargs):
    242     """Factory function that returns a new condition variable object.
    243 
    244     A condition variable allows one or more threads to wait until they are
    245     notified by another thread.
    246 
    247     If the lock argument is given and not None, it must be a Lock or RLock
    248     object, and it is used as the underlying lock. Otherwise, a new RLock object
    249     is created and used as the underlying lock.
    250 
    251     """
    252     return _Condition(*args, **kwargs)
    253 
    254 class _Condition(_Verbose):
    255     """Condition variables allow one or more threads to wait until they are
    256        notified by another thread.
    257     """
    258 
    259     def __init__(self, lock=None, verbose=None):
    260         _Verbose.__init__(self, verbose)
    261         if lock is None:
    262             lock = RLock()
    263         self.__lock = lock
    264         # Export the lock's acquire() and release() methods
    265         self.acquire = lock.acquire
    266         self.release = lock.release
    267         # If the lock defines _release_save() and/or _acquire_restore(),
    268         # these override the default implementations (which just call
    269         # release() and acquire() on the lock).  Ditto for _is_owned().
    270         try:
    271             self._release_save = lock._release_save
    272         except AttributeError:
    273             pass
    274         try:
    275             self._acquire_restore = lock._acquire_restore
    276         except AttributeError:
    277             pass
    278         try:
    279             self._is_owned = lock._is_owned
    280         except AttributeError:
    281             pass
    282         self.__waiters = []
    283 
    284     def __enter__(self):
    285         return self.__lock.__enter__()
    286 
    287     def __exit__(self, *args):
    288         return self.__lock.__exit__(*args)
    289 
    290     def __repr__(self):
    291         return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
    292 
    293     def _release_save(self):
    294         self.__lock.release()           # No state to save
    295 
    296     def _acquire_restore(self, x):
    297         self.__lock.acquire()           # Ignore saved state
    298 
    299     def _is_owned(self):
    300         # Return True if lock is owned by current_thread.
    301         # This method is called only if __lock doesn't have _is_owned().
    302         if self.__lock.acquire(0):
    303             self.__lock.release()
    304             return False
    305         else:
    306             return True
    307 
    308     def wait(self, timeout=None):
    309         """Wait until notified or until a timeout occurs.
    310 
    311         If the calling thread has not acquired the lock when this method is
    312         called, a RuntimeError is raised.
    313 
    314         This method releases the underlying lock, and then blocks until it is
    315         awakened by a notify() or notifyAll() call for the same condition
    316         variable in another thread, or until the optional timeout occurs. Once
    317         awakened or timed out, it re-acquires the lock and returns.
    318 
    319         When the timeout argument is present and not None, it should be a
    320         floating point number specifying a timeout for the operation in seconds
    321         (or fractions thereof).
    322 
    323         When the underlying lock is an RLock, it is not released using its
    324         release() method, since this may not actually unlock the lock when it
    325         was acquired multiple times recursively. Instead, an internal interface
    326         of the RLock class is used, which really unlocks it even when it has
    327         been recursively acquired several times. Another internal interface is
    328         then used to restore the recursion level when the lock is reacquired.
    329 
    330         """
    331         if not self._is_owned():
    332             raise RuntimeError("cannot wait on un-acquired lock")
    333         waiter = _allocate_lock()
    334         waiter.acquire()
    335         self.__waiters.append(waiter)
    336         saved_state = self._release_save()
    337         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    338             if timeout is None:
    339                 waiter.acquire()
    340                 if __debug__:
    341                     self._note("%s.wait(): got it", self)
    342             else:
    343                 # Balancing act:  We can't afford a pure busy loop, so we
    344                 # have to sleep; but if we sleep the whole timeout time,
    345                 # we'll be unresponsive.  The scheme here sleeps very
    346                 # little at first, longer as time goes on, but never longer
    347                 # than 20 times per second (or the timeout time remaining).
    348                 endtime = _time() + timeout
    349                 delay = 0.0005 # 500 us -> initial delay of 1 ms
    350                 while True:
    351                     gotit = waiter.acquire(0)
    352                     if gotit:
    353                         break
    354                     remaining = endtime - _time()
    355                     if remaining <= 0:
    356                         break
    357                     delay = min(delay * 2, remaining, .05)
    358                     _sleep(delay)
    359                 if not gotit:
    360                     if __debug__:
    361                         self._note("%s.wait(%s): timed out", self, timeout)
    362                     try:
    363                         self.__waiters.remove(waiter)
    364                     except ValueError:
    365                         pass
    366                 else:
    367                     if __debug__:
    368                         self._note("%s.wait(%s): got it", self, timeout)
    369         finally:
    370             self._acquire_restore(saved_state)
    371 
    372     def notify(self, n=1):
    373         """Wake up one or more threads waiting on this condition, if any.
    374 
    375         If the calling thread has not acquired the lock when this method is
    376         called, a RuntimeError is raised.
    377 
    378         This method wakes up at most n of the threads waiting for the condition
    379         variable; it is a no-op if no threads are waiting.
    380 
    381         """
    382         if not self._is_owned():
    383             raise RuntimeError("cannot notify on un-acquired lock")
    384         __waiters = self.__waiters
    385         waiters = __waiters[:n]
    386         if not waiters:
    387             if __debug__:
    388                 self._note("%s.notify(): no waiters", self)
    389             return
    390         self._note("%s.notify(): notifying %d waiter%s", self, n,
    391                    n!=1 and "s" or "")
    392         for waiter in waiters:
    393             waiter.release()
    394             try:
    395                 __waiters.remove(waiter)
    396             except ValueError:
    397                 pass
    398 
    399     def notifyAll(self):
    400         """Wake up all threads waiting on this condition.
    401 
    402         If the calling thread has not acquired the lock when this method
    403         is called, a RuntimeError is raised.
    404 
    405         """
    406         self.notify(len(self.__waiters))
    407 
    408     notify_all = notifyAll
    409 
    410 
    411 def Semaphore(*args, **kwargs):
    412     """A factory function that returns a new semaphore.
    413 
    414     Semaphores manage a counter representing the number of release() calls minus
    415     the number of acquire() calls, plus an initial value. The acquire() method
    416     blocks if necessary until it can return without making the counter
    417     negative. If not given, value defaults to 1.
    418 
    419     """
    420     return _Semaphore(*args, **kwargs)
    421 
    422 class _Semaphore(_Verbose):
    423     """Semaphores manage a counter representing the number of release() calls
    424        minus the number of acquire() calls, plus an initial value. The acquire()
    425        method blocks if necessary until it can return without making the counter
    426        negative. If not given, value defaults to 1.
    427 
    428     """
    429 
    430     # After Tim Peters' semaphore class, but not quite the same (no maximum)
    431 
    432     def __init__(self, value=1, verbose=None):
    433         if value < 0:
    434             raise ValueError("semaphore initial value must be >= 0")
    435         _Verbose.__init__(self, verbose)
    436         self.__cond = Condition(Lock())
    437         self.__value = value
    438 
    439     def acquire(self, blocking=1):
    440         """Acquire a semaphore, decrementing the internal counter by one.
    441 
    442         When invoked without arguments: if the internal counter is larger than
    443         zero on entry, decrement it by one and return immediately. If it is zero
    444         on entry, block, waiting until some other thread has called release() to
    445         make it larger than zero. This is done with proper interlocking so that
    446         if multiple acquire() calls are blocked, release() will wake exactly one
    447         of them up. The implementation may pick one at random, so the order in
    448         which blocked threads are awakened should not be relied on. There is no
    449         return value in this case.
    450 
    451         When invoked with blocking set to true, do the same thing as when called
    452         without arguments, and return true.
    453 
    454         When invoked with blocking set to false, do not block. If a call without
    455         an argument would block, return false immediately; otherwise, do the
    456         same thing as when called without arguments, and return true.
    457 
    458         """
    459         rc = False
    460         with self.__cond:
    461             while self.__value == 0:
    462                 if not blocking:
    463                     break
    464                 if __debug__:
    465                     self._note("%s.acquire(%s): blocked waiting, value=%s",
    466                             self, blocking, self.__value)
    467                 self.__cond.wait()
    468             else:
    469                 self.__value = self.__value - 1
    470                 if __debug__:
    471                     self._note("%s.acquire: success, value=%s",
    472                             self, self.__value)
    473                 rc = True
    474         return rc
    475 
    476     __enter__ = acquire
    477 
    478     def release(self):
    479         """Release a semaphore, incrementing the internal counter by one.
    480 
    481         When the counter is zero on entry and another thread is waiting for it
    482         to become larger than zero again, wake up that thread.
    483 
    484         """
    485         with self.__cond:
    486             self.__value = self.__value + 1
    487             if __debug__:
    488                 self._note("%s.release: success, value=%s",
    489                         self, self.__value)
    490             self.__cond.notify()
    491 
    492     def __exit__(self, t, v, tb):
    493         self.release()
    494 
    495 
    496 def BoundedSemaphore(*args, **kwargs):
    497     """A factory function that returns a new bounded semaphore.
    498 
    499     A bounded semaphore checks to make sure its current value doesn't exceed its
    500     initial value. If it does, ValueError is raised. In most situations
    501     semaphores are used to guard resources with limited capacity.
    502 
    503     If the semaphore is released too many times it's a sign of a bug. If not
    504     given, value defaults to 1.
    505 
    506     Like regular semaphores, bounded semaphores manage a counter representing
    507     the number of release() calls minus the number of acquire() calls, plus an
    508     initial value. The acquire() method blocks if necessary until it can return
    509     without making the counter negative. If not given, value defaults to 1.
    510 
    511     """
    512     return _BoundedSemaphore(*args, **kwargs)
    513 
    514 class _BoundedSemaphore(_Semaphore):
    515     """A bounded semaphore checks to make sure its current value doesn't exceed
    516        its initial value. If it does, ValueError is raised. In most situations
    517        semaphores are used to guard resources with limited capacity.
    518     """
    519 
    520     def __init__(self, value=1, verbose=None):
    521         _Semaphore.__init__(self, value, verbose)
    522         self._initial_value = value
    523 
    524     def release(self):
    525         """Release a semaphore, incrementing the internal counter by one.
    526 
    527         When the counter is zero on entry and another thread is waiting for it
    528         to become larger than zero again, wake up that thread.
    529 
    530         If the number of releases exceeds the number of acquires,
    531         raise a ValueError.
    532 
    533         """
    534         if self._Semaphore__value >= self._initial_value:
    535             raise ValueError("Semaphore released too many times")
    536         return _Semaphore.release(self)
    537 
    538 
    539 def Event(*args, **kwargs):
    540     """A factory function that returns a new event.
    541 
    542     Events manage a flag that can be set to true with the set() method and reset
    543     to false with the clear() method. The wait() method blocks until the flag is
    544     true.
    545 
    546     """
    547     return _Event(*args, **kwargs)
    548 
    549 class _Event(_Verbose):
    550     """A factory function that returns a new event object. An event manages a
    551        flag that can be set to true with the set() method and reset to false
    552        with the clear() method. The wait() method blocks until the flag is true.
    553 
    554     """
    555 
    556     # After Tim Peters' event class (without is_posted())
    557 
    558     def __init__(self, verbose=None):
    559         _Verbose.__init__(self, verbose)
    560         self.__cond = Condition(Lock())
    561         self.__flag = False
    562 
    563     def _reset_internal_locks(self):
    564         # private!  called by Thread._reset_internal_locks by _after_fork()
    565         self.__cond.__init__()
    566 
    567     def isSet(self):
    568         'Return true if and only if the internal flag is true.'
    569         return self.__flag
    570 
    571     is_set = isSet
    572 
    573     def set(self):
    574         """Set the internal flag to true.
    575 
    576         All threads waiting for the flag to become true are awakened. Threads
    577         that call wait() once the flag is true will not block at all.
    578 
    579         """
    580         self.__cond.acquire()
    581         try:
    582             self.__flag = True
    583             self.__cond.notify_all()
    584         finally:
    585             self.__cond.release()
    586 
    587     def clear(self):
    588         """Reset the internal flag to false.
    589 
    590         Subsequently, threads calling wait() will block until set() is called to
    591         set the internal flag to true again.
    592 
    593         """
    594         self.__cond.acquire()
    595         try:
    596             self.__flag = False
    597         finally:
    598             self.__cond.release()
    599 
    600     def wait(self, timeout=None):
    601         """Block until the internal flag is true.
    602 
    603         If the internal flag is true on entry, return immediately. Otherwise,
    604         block until another thread calls set() to set the flag to true, or until
    605         the optional timeout occurs.
    606 
    607         When the timeout argument is present and not None, it should be a
    608         floating point number specifying a timeout for the operation in seconds
    609         (or fractions thereof).
    610 
    611         This method returns the internal flag on exit, so it will always return
    612         True except if a timeout is given and the operation times out.
    613 
    614         """
    615         self.__cond.acquire()
    616         try:
    617             if not self.__flag:
    618                 self.__cond.wait(timeout)
    619             return self.__flag
    620         finally:
    621             self.__cond.release()
    622 
    623 # Helper to generate new thread names
    624 _counter = 0
    625 def _newname(template="Thread-%d"):
    626     global _counter
    627     _counter = _counter + 1
    628     return template % _counter
    629 
    630 # Active thread administration
    631 _active_limbo_lock = _allocate_lock()
    632 _active = {}    # maps thread id to Thread object
    633 _limbo = {}
    634 
    635 
    636 # Main class for threads
    637 
    638 class Thread(_Verbose):
    639     """A class that represents a thread of control.
    640 
    641     This class can be safely subclassed in a limited fashion.
    642 
    643     """
    644     __initialized = False
    645     # Need to store a reference to sys.exc_info for printing
    646     # out exceptions when a thread tries to use a global var. during interp.
    647     # shutdown and thus raises an exception about trying to perform some
    648     # operation on/with a NoneType
    649     __exc_info = _sys.exc_info
    650     # Keep sys.exc_clear too to clear the exception just before
    651     # allowing .join() to return.
    652     __exc_clear = _sys.exc_clear
    653 
    654     def __init__(self, group=None, target=None, name=None,
    655                  args=(), kwargs=None, verbose=None):
    656         """This constructor should always be called with keyword arguments. Arguments are:
    657 
    658         *group* should be None; reserved for future extension when a ThreadGroup
    659         class is implemented.
    660 
    661         *target* is the callable object to be invoked by the run()
    662         method. Defaults to None, meaning nothing is called.
    663 
    664         *name* is the thread name. By default, a unique name is constructed of
    665         the form "Thread-N" where N is a small decimal number.
    666 
    667         *args* is the argument tuple for the target invocation. Defaults to ().
    668 
    669         *kwargs* is a dictionary of keyword arguments for the target
    670         invocation. Defaults to {}.
    671 
    672         If a subclass overrides the constructor, it must make sure to invoke
    673         the base class constructor (Thread.__init__()) before doing anything
    674         else to the thread.
    675 
    676 """
    677         assert group is None, "group argument must be None for now"
    678         _Verbose.__init__(self, verbose)
    679         if kwargs is None:
    680             kwargs = {}
    681         self.__target = target
    682         self.__name = str(name or _newname())
    683         self.__args = args
    684         self.__kwargs = kwargs
    685         self.__daemonic = self._set_daemon()
    686         self.__ident = None
    687         self.__started = Event()
    688         self.__stopped = False
    689         self.__block = Condition(Lock())
    690         self.__initialized = True
    691         # sys.stderr is not stored in the class like
    692         # sys.exc_info since it can be changed between instances
    693         self.__stderr = _sys.stderr
    694 
    695     def _reset_internal_locks(self):
    696         # private!  Called by _after_fork() to reset our internal locks as
    697         # they may be in an invalid state leading to a deadlock or crash.
    698         if hasattr(self, '_Thread__block'):  # DummyThread deletes self.__block
    699             self.__block.__init__()
    700         self.__started._reset_internal_locks()
    701 
    702     @property
    703     def _block(self):
    704         # used by a unittest
    705         return self.__block
    706 
    707     def _set_daemon(self):
    708         # Overridden in _MainThread and _DummyThread
    709         return current_thread().daemon
    710 
    711     def __repr__(self):
    712         assert self.__initialized, "Thread.__init__() was not called"
    713         status = "initial"
    714         if self.__started.is_set():
    715             status = "started"
    716         if self.__stopped:
    717             status = "stopped"
    718         if self.__daemonic:
    719             status += " daemon"
    720         if self.__ident is not None:
    721             status += " %s" % self.__ident
    722         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
    723 
    724     def start(self):
    725         """Start the thread's activity.
    726 
    727         It must be called at most once per thread object. It arranges for the
    728         object's run() method to be invoked in a separate thread of control.
    729 
    730         This method will raise a RuntimeError if called more than once on the
    731         same thread object.
    732 
    733         """
    734         if not self.__initialized:
    735             raise RuntimeError("thread.__init__() not called")
    736         if self.__started.is_set():
    737             raise RuntimeError("threads can only be started once")
    738         if __debug__:
    739             self._note("%s.start(): starting thread", self)
    740         with _active_limbo_lock:
    741             _limbo[self] = self
    742         try:
    743             _start_new_thread(self.__bootstrap, ())
    744         except Exception:
    745             with _active_limbo_lock:
    746                 del _limbo[self]
    747             raise
    748         self.__started.wait()
    749 
    750     def run(self):
    751         """Method representing the thread's activity.
    752 
    753         You may override this method in a subclass. The standard run() method
    754         invokes the callable object passed to the object's constructor as the
    755         target argument, if any, with sequential and keyword arguments taken
    756         from the args and kwargs arguments, respectively.
    757 
    758         """
    759         try:
    760             if self.__target:
    761                 self.__target(*self.__args, **self.__kwargs)
    762         finally:
    763             # Avoid a refcycle if the thread is running a function with
    764             # an argument that has a member that points to the thread.
    765             del self.__target, self.__args, self.__kwargs
    766 
    767     def __bootstrap(self):
    768         # Wrapper around the real bootstrap code that ignores
    769         # exceptions during interpreter cleanup.  Those typically
    770         # happen when a daemon thread wakes up at an unfortunate
    771         # moment, finds the world around it destroyed, and raises some
    772         # random exception *** while trying to report the exception in
    773         # __bootstrap_inner() below ***.  Those random exceptions
    774         # don't help anybody, and they confuse users, so we suppress
    775         # them.  We suppress them only when it appears that the world
    776         # indeed has already been destroyed, so that exceptions in
    777         # __bootstrap_inner() during normal business hours are properly
    778         # reported.  Also, we only suppress them for daemonic threads;
    779         # if a non-daemonic encounters this, something else is wrong.
    780         try:
    781             self.__bootstrap_inner()
    782         except:
    783             if self.__daemonic and _sys is None:
    784                 return
    785             raise
    786 
    787     def _set_ident(self):
    788         self.__ident = _get_ident()
    789 
    790     def __bootstrap_inner(self):
    791         try:
    792             self._set_ident()
    793             self.__started.set()
    794             with _active_limbo_lock:
    795                 _active[self.__ident] = self
    796                 del _limbo[self]
    797             if __debug__:
    798                 self._note("%s.__bootstrap(): thread started", self)
    799 
    800             if _trace_hook:
    801                 self._note("%s.__bootstrap(): registering trace hook", self)
    802                 _sys.settrace(_trace_hook)
    803             if _profile_hook:
    804                 self._note("%s.__bootstrap(): registering profile hook", self)
    805                 _sys.setprofile(_profile_hook)
    806 
    807             try:
    808                 self.run()
    809             except SystemExit:
    810                 if __debug__:
    811                     self._note("%s.__bootstrap(): raised SystemExit", self)
    812             except:
    813                 if __debug__:
    814                     self._note("%s.__bootstrap(): unhandled exception", self)
    815                 # If sys.stderr is no more (most likely from interpreter
    816                 # shutdown) use self.__stderr.  Otherwise still use sys (as in
    817                 # _sys) in case sys.stderr was redefined since the creation of
    818                 # self.
    819                 if _sys:
    820                     _sys.stderr.write("Exception in thread %s:\n%s\n" %
    821                                       (self.name, _format_exc()))
    822                 else:
    823                     # Do the best job possible w/o a huge amt. of code to
    824                     # approximate a traceback (code ideas from
    825                     # Lib/traceback.py)
    826                     exc_type, exc_value, exc_tb = self.__exc_info()
    827                     try:
    828                         print>>self.__stderr, (
    829                             "Exception in thread " + self.name +
    830                             " (most likely raised during interpreter shutdown):")
    831                         print>>self.__stderr, (
    832                             "Traceback (most recent call last):")
    833                         while exc_tb:
    834                             print>>self.__stderr, (
    835                                 '  File "%s", line %s, in %s' %
    836                                 (exc_tb.tb_frame.f_code.co_filename,
    837                                     exc_tb.tb_lineno,
    838                                     exc_tb.tb_frame.f_code.co_name))
    839                             exc_tb = exc_tb.tb_next
    840                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
    841                     # Make sure that exc_tb gets deleted since it is a memory
    842                     # hog; deleting everything else is just for thoroughness
    843                     finally:
    844                         del exc_type, exc_value, exc_tb
    845             else:
    846                 if __debug__:
    847                     self._note("%s.__bootstrap(): normal return", self)
    848             finally:
    849                 # Prevent a race in
    850                 # test_threading.test_no_refcycle_through_target when
    851                 # the exception keeps the target alive past when we
    852                 # assert that it's dead.
    853                 self.__exc_clear()
    854         finally:
    855             with _active_limbo_lock:
    856                 self.__stop()
    857                 try:
    858                     # We don't call self.__delete() because it also
    859                     # grabs _active_limbo_lock.
    860                     del _active[_get_ident()]
    861                 except:
    862                     pass
    863 
    864     def __stop(self):
    865         # DummyThreads delete self.__block, but they have no waiters to
    866         # notify anyway (join() is forbidden on them).
    867         if not hasattr(self, '_Thread__block'):
    868             return
    869         self.__block.acquire()
    870         self.__stopped = True
    871         self.__block.notify_all()
    872         self.__block.release()
    873 
    874     def __delete(self):
    875         "Remove current thread from the dict of currently running threads."
    876 
    877         # Notes about running with dummy_thread:
    878         #
    879         # Must take care to not raise an exception if dummy_thread is being
    880         # used (and thus this module is being used as an instance of
    881         # dummy_threading).  dummy_thread.get_ident() always returns -1 since
    882         # there is only one thread if dummy_thread is being used.  Thus
    883         # len(_active) is always <= 1 here, and any Thread instance created
    884         # overwrites the (if any) thread currently registered in _active.
    885         #
    886         # An instance of _MainThread is always created by 'threading'.  This
    887         # gets overwritten the instant an instance of Thread is created; both
    888         # threads return -1 from dummy_thread.get_ident() and thus have the
    889         # same key in the dict.  So when the _MainThread instance created by
    890         # 'threading' tries to clean itself up when atexit calls this method
    891         # it gets a KeyError if another Thread instance was created.
    892         #
    893         # This all means that KeyError from trying to delete something from
    894         # _active if dummy_threading is being used is a red herring.  But
    895         # since it isn't if dummy_threading is *not* being used then don't
    896         # hide the exception.
    897 
    898         try:
    899             with _active_limbo_lock:
    900                 del _active[_get_ident()]
    901                 # There must not be any python code between the previous line
    902                 # and after the lock is released.  Otherwise a tracing function
    903                 # could try to acquire the lock again in the same thread, (in
    904                 # current_thread()), and would block.
    905         except KeyError:
    906             if 'dummy_threading' not in _sys.modules:
    907                 raise
    908 
    909     def join(self, timeout=None):
    910         """Wait until the thread terminates.
    911 
    912         This blocks the calling thread until the thread whose join() method is
    913         called terminates -- either normally or through an unhandled exception
    914         or until the optional timeout occurs.
    915 
    916         When the timeout argument is present and not None, it should be a
    917         floating point number specifying a timeout for the operation in seconds
    918         (or fractions thereof). As join() always returns None, you must call
    919         isAlive() after join() to decide whether a timeout happened -- if the
    920         thread is still alive, the join() call timed out.
    921 
    922         When the timeout argument is not present or None, the operation will
    923         block until the thread terminates.
    924 
    925         A thread can be join()ed many times.
    926 
    927         join() raises a RuntimeError if an attempt is made to join the current
    928         thread as that would cause a deadlock. It is also an error to join() a
    929         thread before it has been started and attempts to do so raises the same
    930         exception.
    931 
    932         """
    933         if not self.__initialized:
    934             raise RuntimeError("Thread.__init__() not called")
    935         if not self.__started.is_set():
    936             raise RuntimeError("cannot join thread before it is started")
    937         if self is current_thread():
    938             raise RuntimeError("cannot join current thread")
    939 
    940         if __debug__:
    941             if not self.__stopped:
    942                 self._note("%s.join(): waiting until thread stops", self)
    943         self.__block.acquire()
    944         try:
    945             if timeout is None:
    946                 while not self.__stopped:
    947                     self.__block.wait()
    948                 if __debug__:
    949                     self._note("%s.join(): thread stopped", self)
    950             else:
    951                 deadline = _time() + timeout
    952                 while not self.__stopped:
    953                     delay = deadline - _time()
    954                     if delay <= 0:
    955                         if __debug__:
    956                             self._note("%s.join(): timed out", self)
    957                         break
    958                     self.__block.wait(delay)
    959                 else:
    960                     if __debug__:
    961                         self._note("%s.join(): thread stopped", self)
    962         finally:
    963             self.__block.release()
    964 
    965     @property
    966     def name(self):
    967         """A string used for identification purposes only.
    968 
    969         It has no semantics. Multiple threads may be given the same name. The
    970         initial name is set by the constructor.
    971 
    972         """
    973         assert self.__initialized, "Thread.__init__() not called"
    974         return self.__name
    975 
    976     @name.setter
    977     def name(self, name):
    978         assert self.__initialized, "Thread.__init__() not called"
    979         self.__name = str(name)
    980 
    981     @property
    982     def ident(self):
    983         """Thread identifier of this thread or None if it has not been started.
    984 
    985         This is a nonzero integer. See the thread.get_ident() function. Thread
    986         identifiers may be recycled when a thread exits and another thread is
    987         created. The identifier is available even after the thread has exited.
    988 
    989         """
    990         assert self.__initialized, "Thread.__init__() not called"
    991         return self.__ident
    992 
    993     def isAlive(self):
    994         """Return whether the thread is alive.
    995 
    996         This method returns True just before the run() method starts until just
    997         after the run() method terminates. The module function enumerate()
    998         returns a list of all alive threads.
    999 
   1000         """
   1001         assert self.__initialized, "Thread.__init__() not called"
   1002         return self.__started.is_set() and not self.__stopped
   1003 
   1004     is_alive = isAlive
   1005 
   1006     @property
   1007     def daemon(self):
   1008         """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
   1009 
   1010         This must be set before start() is called, otherwise RuntimeError is
   1011         raised. Its initial value is inherited from the creating thread; the
   1012         main thread is not a daemon thread and therefore all threads created in
   1013         the main thread default to daemon = False.
   1014 
   1015         The entire Python program exits when no alive non-daemon threads are
   1016         left.
   1017 
   1018         """
   1019         assert self.__initialized, "Thread.__init__() not called"
   1020         return self.__daemonic
   1021 
   1022     @daemon.setter
   1023     def daemon(self, daemonic):
   1024         if not self.__initialized:
   1025             raise RuntimeError("Thread.__init__() not called")
   1026         if self.__started.is_set():
   1027             raise RuntimeError("cannot set daemon status of active thread");
   1028         self.__daemonic = daemonic
   1029 
   1030     def isDaemon(self):
   1031         return self.daemon
   1032 
   1033     def setDaemon(self, daemonic):
   1034         self.daemon = daemonic
   1035 
   1036     def getName(self):
   1037         return self.name
   1038 
   1039     def setName(self, name):
   1040         self.name = name
   1041 
   1042 # The timer class was contributed by Itamar Shtull-Trauring
   1043 
   1044 def Timer(*args, **kwargs):
   1045     """Factory function to create a Timer object.
   1046 
   1047     Timers call a function after a specified number of seconds:
   1048 
   1049         t = Timer(30.0, f, args=[], kwargs={})
   1050         t.start()
   1051         t.cancel()     # stop the timer's action if it's still waiting
   1052 
   1053     """
   1054     return _Timer(*args, **kwargs)
   1055 
   1056 class _Timer(Thread):
   1057     """Call a function after a specified number of seconds:
   1058 
   1059             t = Timer(30.0, f, args=[], kwargs={})
   1060             t.start()
   1061             t.cancel()     # stop the timer's action if it's still waiting
   1062 
   1063     """
   1064 
   1065     def __init__(self, interval, function, args=[], kwargs={}):
   1066         Thread.__init__(self)
   1067         self.interval = interval
   1068         self.function = function
   1069         self.args = args
   1070         self.kwargs = kwargs
   1071         self.finished = Event()
   1072 
   1073     def cancel(self):
   1074         """Stop the timer if it hasn't finished yet"""
   1075         self.finished.set()
   1076 
   1077     def run(self):
   1078         self.finished.wait(self.interval)
   1079         if not self.finished.is_set():
   1080             self.function(*self.args, **self.kwargs)
   1081         self.finished.set()
   1082 
   1083 # Special thread class to represent the main thread
   1084 # This is garbage collected through an exit handler
   1085 
   1086 class _MainThread(Thread):
   1087 
   1088     def __init__(self):
   1089         Thread.__init__(self, name="MainThread")
   1090         self._Thread__started.set()
   1091         self._set_ident()
   1092         with _active_limbo_lock:
   1093             _active[_get_ident()] = self
   1094 
   1095     def _set_daemon(self):
   1096         return False
   1097 
   1098     def _exitfunc(self):
   1099         self._Thread__stop()
   1100         t = _pickSomeNonDaemonThread()
   1101         if t:
   1102             if __debug__:
   1103                 self._note("%s: waiting for other threads", self)
   1104         while t:
   1105             t.join()
   1106             t = _pickSomeNonDaemonThread()
   1107         if __debug__:
   1108             self._note("%s: exiting", self)
   1109         self._Thread__delete()
   1110 
   1111 def _pickSomeNonDaemonThread():
   1112     for t in enumerate():
   1113         if not t.daemon and t.is_alive():
   1114             return t
   1115     return None
   1116 
   1117 
   1118 # Dummy thread class to represent threads not started here.
   1119 # These aren't garbage collected when they die, nor can they be waited for.
   1120 # If they invoke anything in threading.py that calls current_thread(), they
   1121 # leave an entry in the _active dict forever after.
   1122 # Their purpose is to return *something* from current_thread().
   1123 # They are marked as daemon threads so we won't wait for them
   1124 # when we exit (conform previous semantics).
   1125 
   1126 class _DummyThread(Thread):
   1127 
   1128     def __init__(self):
   1129         Thread.__init__(self, name=_newname("Dummy-%d"))
   1130 
   1131         # Thread.__block consumes an OS-level locking primitive, which
   1132         # can never be used by a _DummyThread.  Since a _DummyThread
   1133         # instance is immortal, that's bad, so release this resource.
   1134         del self._Thread__block
   1135 
   1136         self._Thread__started.set()
   1137         self._set_ident()
   1138         with _active_limbo_lock:
   1139             _active[_get_ident()] = self
   1140 
   1141     def _set_daemon(self):
   1142         return True
   1143 
   1144     def join(self, timeout=None):
   1145         assert False, "cannot join a dummy thread"
   1146 
   1147 
   1148 # Global API functions
   1149 
   1150 def currentThread():
   1151     """Return the current Thread object, corresponding to the caller's thread of control.
   1152 
   1153     If the caller's thread of control was not created through the threading
   1154     module, a dummy thread object with limited functionality is returned.
   1155 
   1156     """
   1157     try:
   1158         return _active[_get_ident()]
   1159     except KeyError:
   1160         ##print "current_thread(): no current thread for", _get_ident()
   1161         return _DummyThread()
   1162 
   1163 current_thread = currentThread
   1164 
   1165 def activeCount():
   1166     """Return the number of Thread objects currently alive.
   1167 
   1168     The returned count is equal to the length of the list returned by
   1169     enumerate().
   1170 
   1171     """
   1172     with _active_limbo_lock:
   1173         return len(_active) + len(_limbo)
   1174 
   1175 active_count = activeCount
   1176 
   1177 def _enumerate():
   1178     # Same as enumerate(), but without the lock. Internal use only.
   1179     return _active.values() + _limbo.values()
   1180 
   1181 def enumerate():
   1182     """Return a list of all Thread objects currently alive.
   1183 
   1184     The list includes daemonic threads, dummy thread objects created by
   1185     current_thread(), and the main thread. It excludes terminated threads and
   1186     threads that have not yet been started.
   1187 
   1188     """
   1189     with _active_limbo_lock:
   1190         return _active.values() + _limbo.values()
   1191 
   1192 from thread import stack_size
   1193 
   1194 # Create the main thread object,
   1195 # and make it available for the interpreter
   1196 # (Py_Main) as threading._shutdown.
   1197 
   1198 _shutdown = _MainThread()._exitfunc
   1199 
   1200 # get thread-local implementation, either from the thread
   1201 # module, or from the python fallback
   1202 
   1203 try:
   1204     from thread import _local as local
   1205 except ImportError:
   1206     from _threading_local import local
   1207 
   1208 
   1209 def _after_fork():
   1210     # This function is called by Python/ceval.c:PyEval_ReInitThreads which
   1211     # is called from PyOS_AfterFork.  Here we cleanup threading module state
   1212     # that should not exist after a fork.
   1213 
   1214     # Reset _active_limbo_lock, in case we forked while the lock was held
   1215     # by another (non-forked) thread.  http://bugs.python.org/issue874900
   1216     global _active_limbo_lock
   1217     _active_limbo_lock = _allocate_lock()
   1218 
   1219     # fork() only copied the current thread; clear references to others.
   1220     new_active = {}
   1221     current = current_thread()
   1222     with _active_limbo_lock:
   1223         for thread in _active.itervalues():
   1224             # Any lock/condition variable may be currently locked or in an
   1225             # invalid state, so we reinitialize them.
   1226             if hasattr(thread, '_reset_internal_locks'):
   1227                 thread._reset_internal_locks()
   1228             if thread is current:
   1229                 # There is only one active thread. We reset the ident to
   1230                 # its new value since it can have changed.
   1231                 ident = _get_ident()
   1232                 thread._Thread__ident = ident
   1233                 new_active[ident] = thread
   1234             else:
   1235                 # All the others are already stopped.
   1236                 thread._Thread__stop()
   1237 
   1238         _limbo.clear()
   1239         _active.clear()
   1240         _active.update(new_active)
   1241         assert len(_active) == 1
   1242 
   1243 
   1244 # Self-test code
   1245 
   1246 def _test():
   1247 
   1248     class BoundedQueue(_Verbose):
   1249 
   1250         def __init__(self, limit):
   1251             _Verbose.__init__(self)
   1252             self.mon = RLock()
   1253             self.rc = Condition(self.mon)
   1254             self.wc = Condition(self.mon)
   1255             self.limit = limit
   1256             self.queue = _deque()
   1257 
   1258         def put(self, item):
   1259             self.mon.acquire()
   1260             while len(self.queue) >= self.limit:
   1261                 self._note("put(%s): queue full", item)
   1262                 self.wc.wait()
   1263             self.queue.append(item)
   1264             self._note("put(%s): appended, length now %d",
   1265                        item, len(self.queue))
   1266             self.rc.notify()
   1267             self.mon.release()
   1268 
   1269         def get(self):
   1270             self.mon.acquire()
   1271             while not self.queue:
   1272                 self._note("get(): queue empty")
   1273                 self.rc.wait()
   1274             item = self.queue.popleft()
   1275             self._note("get(): got %s, %d left", item, len(self.queue))
   1276             self.wc.notify()
   1277             self.mon.release()
   1278             return item
   1279 
   1280     class ProducerThread(Thread):
   1281 
   1282         def __init__(self, queue, quota):
   1283             Thread.__init__(self, name="Producer")
   1284             self.queue = queue
   1285             self.quota = quota
   1286 
   1287         def run(self):
   1288             from random import random
   1289             counter = 0
   1290             while counter < self.quota:
   1291                 counter = counter + 1
   1292                 self.queue.put("%s.%d" % (self.name, counter))
   1293                 _sleep(random() * 0.00001)
   1294 
   1295 
   1296     class ConsumerThread(Thread):
   1297 
   1298         def __init__(self, queue, count):
   1299             Thread.__init__(self, name="Consumer")
   1300             self.queue = queue
   1301             self.count = count
   1302 
   1303         def run(self):
   1304             while self.count > 0:
   1305                 item = self.queue.get()
   1306                 print item
   1307                 self.count = self.count - 1
   1308 
   1309     NP = 3
   1310     QL = 4
   1311     NI = 5
   1312 
   1313     Q = BoundedQueue(QL)
   1314     P = []
   1315     for i in range(NP):
   1316         t = ProducerThread(Q, NI)
   1317         t.name = ("Producer-%d" % (i+1))
   1318         P.append(t)
   1319     C = ConsumerThread(Q, NI*NP)
   1320     for t in P:
   1321         t.start()
   1322         _sleep(0.000001)
   1323     C.start()
   1324     for t in P:
   1325         t.join()
   1326     C.join()
   1327 
   1328 if __name__ == '__main__':
   1329     _test()
   1330