Home | History | Annotate | Download | only in Lib
      1 """Thread module emulating a subset of Java's threading model."""
      2 
      3 import sys as _sys
      4 
      5 try:
      6     import thread
      7 except ImportError:
      8     del _sys.modules[__name__]
      9     raise
     10 
     11 import warnings
     12 
     13 from collections import deque as _deque
     14 from itertools import count as _count
     15 from time import time as _time, sleep as _sleep
     16 from traceback import format_exc as _format_exc
     17 
     18 # Note regarding PEP 8 compliant aliases
     19 #  This threading model was originally inspired by Java, and inherited
     20 # the convention of camelCase function and method names from that
     21 # language. While those names are not in any imminent danger of being
     22 # deprecated, starting with Python 2.6, the module now provides a
     23 # PEP 8 compliant alias for any such method name.
     24 # Using the new PEP 8 compliant names also facilitates substitution
     25 # with the multiprocessing module, which doesn't provide the old
     26 # Java inspired names.
     27 
     28 
     29 # Rename some stuff so "from threading import *" is safe
     30 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
     31            'current_thread', 'enumerate', 'Event',
     32            'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
     33            'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
     34 
     35 _start_new_thread = thread.start_new_thread
     36 _allocate_lock = thread.allocate_lock
     37 _get_ident = thread.get_ident
     38 ThreadError = thread.error
     39 del thread
     40 
     41 
     42 # sys.exc_clear is used to work around the fact that except blocks
     43 # don't fully clear the exception until 3.0.
     44 warnings.filterwarnings('ignore', category=DeprecationWarning,
     45                         module='threading', message='sys.exc_clear')
     46 
     47 # Debug support (adapted from ihooks.py).
     48 # All the major classes here derive from _Verbose.  We force that to
     49 # be a new-style class so that all the major classes here are new-style.
     50 # This helps debugging (type(instance) is more revealing for instances
     51 # of new-style classes).
     52 
     53 _VERBOSE = False
     54 
     55 if __debug__:
     56 
     57     class _Verbose(object):
     58 
     59         def __init__(self, verbose=None):
     60             if verbose is None:
     61                 verbose = _VERBOSE
     62             self.__verbose = verbose
     63 
     64         def _note(self, format, *args):
     65             if self.__verbose:
     66                 format = format % args
     67                 # Issue #4188: calling current_thread() can incur an infinite
     68                 # recursion if it has to create a DummyThread on the fly.
     69                 ident = _get_ident()
     70                 try:
     71                     name = _active[ident].name
     72                 except KeyError:
     73                     name = "<OS thread %d>" % ident
     74                 format = "%s: %s\n" % (name, format)
     75                 _sys.stderr.write(format)
     76 
     77 else:
     78     # Disable this when using "python -O"
     79     class _Verbose(object):
     80         def __init__(self, verbose=None):
     81             pass
     82         def _note(self, *args):
     83             pass
     84 
     85 # Support for profile and trace hooks
     86 
     87 _profile_hook = None
     88 _trace_hook = None
     89 
     90 def setprofile(func):
     91     """Set a profile function for all threads started from the threading module.
     92 
     93     The func will be passed to sys.setprofile() for each thread, before its
     94     run() method is called.
     95 
     96     """
     97     global _profile_hook
     98     _profile_hook = func
     99 
    100 def settrace(func):
    101     """Set a trace function for all threads started from the threading module.
    102 
    103     The func will be passed to sys.settrace() for each thread, before its run()
    104     method is called.
    105 
    106     """
    107     global _trace_hook
    108     _trace_hook = func
    109 
    110 # Synchronization classes
    111 
    112 Lock = _allocate_lock
    113 
    114 def RLock(*args, **kwargs):
    115     """Factory function that returns a new reentrant lock.
    116 
    117     A reentrant lock must be released by the thread that acquired it. Once a
    118     thread has acquired a reentrant lock, the same thread may acquire it again
    119     without blocking; the thread must release it once for each time it has
    120     acquired it.
    121 
    122     """
    123     return _RLock(*args, **kwargs)
    124 
    125 class _RLock(_Verbose):
    126     """A reentrant lock must be released by the thread that acquired it. Once a
    127        thread has acquired a reentrant lock, the same thread may acquire it
    128        again without blocking; the thread must release it once for each time it
    129        has acquired it.
    130     """
    131 
    132     def __init__(self, verbose=None):
    133         _Verbose.__init__(self, verbose)
    134         self.__block = _allocate_lock()
    135         self.__owner = None
    136         self.__count = 0
    137 
    138     def __repr__(self):
    139         owner = self.__owner
    140         try:
    141             owner = _active[owner].name
    142         except KeyError:
    143             pass
    144         return "<%s owner=%r count=%d>" % (
    145                 self.__class__.__name__, owner, self.__count)
    146 
    147     def acquire(self, blocking=1):
    148         """Acquire a lock, blocking or non-blocking.
    149 
    150         When invoked without arguments: if this thread already owns the lock,
    151         increment the recursion level by one, and return immediately. Otherwise,
    152         if another thread owns the lock, block until the lock is unlocked. Once
    153         the lock is unlocked (not owned by any thread), then grab ownership, set
    154         the recursion level to one, and return. If more than one thread is
    155         blocked waiting until the lock is unlocked, only one at a time will be
    156         able to grab ownership of the lock. There is no return value in this
    157         case.
    158 
    159         When invoked with the blocking argument set to true, do the same thing
    160         as when called without arguments, and return true.
    161 
    162         When invoked with the blocking argument set to false, do not block. If a
    163         call without an argument would block, return false immediately;
    164         otherwise, do the same thing as when called without arguments, and
    165         return true.
    166 
    167         """
    168         me = _get_ident()
    169         if self.__owner == me:
    170             self.__count = self.__count + 1
    171             if __debug__:
    172                 self._note("%s.acquire(%s): recursive success", self, blocking)
    173             return 1
    174         rc = self.__block.acquire(blocking)
    175         if rc:
    176             self.__owner = me
    177             self.__count = 1
    178             if __debug__:
    179                 self._note("%s.acquire(%s): initial success", self, blocking)
    180         else:
    181             if __debug__:
    182                 self._note("%s.acquire(%s): failure", self, blocking)
    183         return rc
    184 
    185     __enter__ = acquire
    186 
    187     def release(self):
    188         """Release a lock, decrementing the recursion level.
    189 
    190         If after the decrement it is zero, reset the lock to unlocked (not owned
    191         by any thread), and if any other threads are blocked waiting for the
    192         lock to become unlocked, allow exactly one of them to proceed. If after
    193         the decrement the recursion level is still nonzero, the lock remains
    194         locked and owned by the calling thread.
    195 
    196         Only call this method when the calling thread owns the lock. A
    197         RuntimeError is raised if this method is called when the lock is
    198         unlocked.
    199 
    200         There is no return value.
    201 
    202         """
    203         if self.__owner != _get_ident():
    204             raise RuntimeError("cannot release un-acquired lock")
    205         self.__count = count = self.__count - 1
    206         if not count:
    207             self.__owner = None
    208             self.__block.release()
    209             if __debug__:
    210                 self._note("%s.release(): final release", self)
    211         else:
    212             if __debug__:
    213                 self._note("%s.release(): non-final release", self)
    214 
    215     def __exit__(self, t, v, tb):
    216         self.release()
    217 
    218     # Internal methods used by condition variables
    219 
    220     def _acquire_restore(self, count_owner):
    221         count, owner = count_owner
    222         self.__block.acquire()
    223         self.__count = count
    224         self.__owner = owner
    225         if __debug__:
    226             self._note("%s._acquire_restore()", self)
    227 
    228     def _release_save(self):
    229         if __debug__:
    230             self._note("%s._release_save()", self)
    231         count = self.__count
    232         self.__count = 0
    233         owner = self.__owner
    234         self.__owner = None
    235         self.__block.release()
    236         return (count, owner)
    237 
    238     def _is_owned(self):
    239         return self.__owner == _get_ident()
    240 
    241 
    242 def Condition(*args, **kwargs):
    243     """Factory function that returns a new condition variable object.
    244 
    245     A condition variable allows one or more threads to wait until they are
    246     notified by another thread.
    247 
    248     If the lock argument is given and not None, it must be a Lock or RLock
    249     object, and it is used as the underlying lock. Otherwise, a new RLock object
    250     is created and used as the underlying lock.
    251 
    252     """
    253     return _Condition(*args, **kwargs)
    254 
    255 class _Condition(_Verbose):
    256     """Condition variables allow one or more threads to wait until they are
    257        notified by another thread.
    258     """
    259 
    260     def __init__(self, lock=None, verbose=None):
    261         _Verbose.__init__(self, verbose)
    262         if lock is None:
    263             lock = RLock()
    264         self.__lock = lock
    265         # Export the lock's acquire() and release() methods
    266         self.acquire = lock.acquire
    267         self.release = lock.release
    268         # If the lock defines _release_save() and/or _acquire_restore(),
    269         # these override the default implementations (which just call
    270         # release() and acquire() on the lock).  Ditto for _is_owned().
    271         try:
    272             self._release_save = lock._release_save
    273         except AttributeError:
    274             pass
    275         try:
    276             self._acquire_restore = lock._acquire_restore
    277         except AttributeError:
    278             pass
    279         try:
    280             self._is_owned = lock._is_owned
    281         except AttributeError:
    282             pass
    283         self.__waiters = []
    284 
    285     def __enter__(self):
    286         return self.__lock.__enter__()
    287 
    288     def __exit__(self, *args):
    289         return self.__lock.__exit__(*args)
    290 
    291     def __repr__(self):
    292         return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
    293 
    294     def _release_save(self):
    295         self.__lock.release()           # No state to save
    296 
    297     def _acquire_restore(self, x):
    298         self.__lock.acquire()           # Ignore saved state
    299 
    300     def _is_owned(self):
    301         # Return True if lock is owned by current_thread.
    302         # This method is called only if __lock doesn't have _is_owned().
    303         if self.__lock.acquire(0):
    304             self.__lock.release()
    305             return False
    306         else:
    307             return True
    308 
    309     def wait(self, timeout=None):
    310         """Wait until notified or until a timeout occurs.
    311 
    312         If the calling thread has not acquired the lock when this method is
    313         called, a RuntimeError is raised.
    314 
    315         This method releases the underlying lock, and then blocks until it is
    316         awakened by a notify() or notifyAll() call for the same condition
    317         variable in another thread, or until the optional timeout occurs. Once
    318         awakened or timed out, it re-acquires the lock and returns.
    319 
    320         When the timeout argument is present and not None, it should be a
    321         floating point number specifying a timeout for the operation in seconds
    322         (or fractions thereof).
    323 
    324         When the underlying lock is an RLock, it is not released using its
    325         release() method, since this may not actually unlock the lock when it
    326         was acquired multiple times recursively. Instead, an internal interface
    327         of the RLock class is used, which really unlocks it even when it has
    328         been recursively acquired several times. Another internal interface is
    329         then used to restore the recursion level when the lock is reacquired.
    330 
    331         """
    332         if not self._is_owned():
    333             raise RuntimeError("cannot wait on un-acquired lock")
    334         waiter = _allocate_lock()
    335         waiter.acquire()
    336         self.__waiters.append(waiter)
    337         saved_state = self._release_save()
    338         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    339             if timeout is None:
    340                 waiter.acquire()
    341                 if __debug__:
    342                     self._note("%s.wait(): got it", self)
    343             else:
    344                 # Balancing act:  We can't afford a pure busy loop, so we
    345                 # have to sleep; but if we sleep the whole timeout time,
    346                 # we'll be unresponsive.  The scheme here sleeps very
    347                 # little at first, longer as time goes on, but never longer
    348                 # than 20 times per second (or the timeout time remaining).
    349                 endtime = _time() + timeout
    350                 delay = 0.0005 # 500 us -> initial delay of 1 ms
    351                 while True:
    352                     gotit = waiter.acquire(0)
    353                     if gotit:
    354                         break
    355                     remaining = endtime - _time()
    356                     if remaining <= 0:
    357                         break
    358                     delay = min(delay * 2, remaining, .05)
    359                     _sleep(delay)
    360                 if not gotit:
    361                     if __debug__:
    362                         self._note("%s.wait(%s): timed out", self, timeout)
    363                     try:
    364                         self.__waiters.remove(waiter)
    365                     except ValueError:
    366                         pass
    367                 else:
    368                     if __debug__:
    369                         self._note("%s.wait(%s): got it", self, timeout)
    370         finally:
    371             self._acquire_restore(saved_state)
    372 
    373     def notify(self, n=1):
    374         """Wake up one or more threads waiting on this condition, if any.
    375 
    376         If the calling thread has not acquired the lock when this method is
    377         called, a RuntimeError is raised.
    378 
    379         This method wakes up at most n of the threads waiting for the condition
    380         variable; it is a no-op if no threads are waiting.
    381 
    382         """
    383         if not self._is_owned():
    384             raise RuntimeError("cannot notify on un-acquired lock")
    385         __waiters = self.__waiters
    386         waiters = __waiters[:n]
    387         if not waiters:
    388             if __debug__:
    389                 self._note("%s.notify(): no waiters", self)
    390             return
    391         self._note("%s.notify(): notifying %d waiter%s", self, n,
    392                    n!=1 and "s" or "")
    393         for waiter in waiters:
    394             waiter.release()
    395             try:
    396                 __waiters.remove(waiter)
    397             except ValueError:
    398                 pass
    399 
    400     def notifyAll(self):
    401         """Wake up all threads waiting on this condition.
    402 
    403         If the calling thread has not acquired the lock when this method
    404         is called, a RuntimeError is raised.
    405 
    406         """
    407         self.notify(len(self.__waiters))
    408 
    409     notify_all = notifyAll
    410 
    411 
    412 def Semaphore(*args, **kwargs):
    413     """A factory function that returns a new semaphore.
    414 
    415     Semaphores manage a counter representing the number of release() calls minus
    416     the number of acquire() calls, plus an initial value. The acquire() method
    417     blocks if necessary until it can return without making the counter
    418     negative. If not given, value defaults to 1.
    419 
    420     """
    421     return _Semaphore(*args, **kwargs)
    422 
    423 class _Semaphore(_Verbose):
    424     """Semaphores manage a counter representing the number of release() calls
    425        minus the number of acquire() calls, plus an initial value. The acquire()
    426        method blocks if necessary until it can return without making the counter
    427        negative. If not given, value defaults to 1.
    428 
    429     """
    430 
    431     # After Tim Peters' semaphore class, but not quite the same (no maximum)
    432 
    433     def __init__(self, value=1, verbose=None):
    434         if value < 0:
    435             raise ValueError("semaphore initial value must be >= 0")
    436         _Verbose.__init__(self, verbose)
    437         self.__cond = Condition(Lock())
    438         self.__value = value
    439 
    440     def acquire(self, blocking=1):
    441         """Acquire a semaphore, decrementing the internal counter by one.
    442 
    443         When invoked without arguments: if the internal counter is larger than
    444         zero on entry, decrement it by one and return immediately. If it is zero
    445         on entry, block, waiting until some other thread has called release() to
    446         make it larger than zero. This is done with proper interlocking so that
    447         if multiple acquire() calls are blocked, release() will wake exactly one
    448         of them up. The implementation may pick one at random, so the order in
    449         which blocked threads are awakened should not be relied on. There is no
    450         return value in this case.
    451 
    452         When invoked with blocking set to true, do the same thing as when called
    453         without arguments, and return true.
    454 
    455         When invoked with blocking set to false, do not block. If a call without
    456         an argument would block, return false immediately; otherwise, do the
    457         same thing as when called without arguments, and return true.
    458 
    459         """
    460         rc = False
    461         with self.__cond:
    462             while self.__value == 0:
    463                 if not blocking:
    464                     break
    465                 if __debug__:
    466                     self._note("%s.acquire(%s): blocked waiting, value=%s",
    467                             self, blocking, self.__value)
    468                 self.__cond.wait()
    469             else:
    470                 self.__value = self.__value - 1
    471                 if __debug__:
    472                     self._note("%s.acquire: success, value=%s",
    473                             self, self.__value)
    474                 rc = True
    475         return rc
    476 
    477     __enter__ = acquire
    478 
    479     def release(self):
    480         """Release a semaphore, incrementing the internal counter by one.
    481 
    482         When the counter is zero on entry and another thread is waiting for it
    483         to become larger than zero again, wake up that thread.
    484 
    485         """
    486         with self.__cond:
    487             self.__value = self.__value + 1
    488             if __debug__:
    489                 self._note("%s.release: success, value=%s",
    490                         self, self.__value)
    491             self.__cond.notify()
    492 
    493     def __exit__(self, t, v, tb):
    494         self.release()
    495 
    496 
    497 def BoundedSemaphore(*args, **kwargs):
    498     """A factory function that returns a new bounded semaphore.
    499 
    500     A bounded semaphore checks to make sure its current value doesn't exceed its
    501     initial value. If it does, ValueError is raised. In most situations
    502     semaphores are used to guard resources with limited capacity.
    503 
    504     If the semaphore is released too many times it's a sign of a bug. If not
    505     given, value defaults to 1.
    506 
    507     Like regular semaphores, bounded semaphores manage a counter representing
    508     the number of release() calls minus the number of acquire() calls, plus an
    509     initial value. The acquire() method blocks if necessary until it can return
    510     without making the counter negative. If not given, value defaults to 1.
    511 
    512     """
    513     return _BoundedSemaphore(*args, **kwargs)
    514 
    515 class _BoundedSemaphore(_Semaphore):
    516     """A bounded semaphore checks to make sure its current value doesn't exceed
    517        its initial value. If it does, ValueError is raised. In most situations
    518        semaphores are used to guard resources with limited capacity.
    519     """
    520 
    521     def __init__(self, value=1, verbose=None):
    522         _Semaphore.__init__(self, value, verbose)
    523         self._initial_value = value
    524 
    525     def release(self):
    526         """Release a semaphore, incrementing the internal counter by one.
    527 
    528         When the counter is zero on entry and another thread is waiting for it
    529         to become larger than zero again, wake up that thread.
    530 
    531         If the number of releases exceeds the number of acquires,
    532         raise a ValueError.
    533 
    534         """
    535         with self._Semaphore__cond:
    536             if self._Semaphore__value >= self._initial_value:
    537                 raise ValueError("Semaphore released too many times")
    538             self._Semaphore__value += 1
    539             self._Semaphore__cond.notify()
    540 
    541 
    542 def Event(*args, **kwargs):
    543     """A factory function that returns a new event.
    544 
    545     Events manage a flag that can be set to true with the set() method and reset
    546     to false with the clear() method. The wait() method blocks until the flag is
    547     true.
    548 
    549     """
    550     return _Event(*args, **kwargs)
    551 
    552 class _Event(_Verbose):
    553     """A factory function that returns a new event object. An event manages a
    554        flag that can be set to true with the set() method and reset to false
    555        with the clear() method. The wait() method blocks until the flag is true.
    556 
    557     """
    558 
    559     # After Tim Peters' event class (without is_posted())
    560 
    561     def __init__(self, verbose=None):
    562         _Verbose.__init__(self, verbose)
    563         self.__cond = Condition(Lock())
    564         self.__flag = False
    565 
    566     def _reset_internal_locks(self):
    567         # private!  called by Thread._reset_internal_locks by _after_fork()
    568         self.__cond.__init__(Lock())
    569 
    570     def isSet(self):
    571         'Return true if and only if the internal flag is true.'
    572         return self.__flag
    573 
    574     is_set = isSet
    575 
    576     def set(self):
    577         """Set the internal flag to true.
    578 
    579         All threads waiting for the flag to become true are awakened. Threads
    580         that call wait() once the flag is true will not block at all.
    581 
    582         """
    583         with self.__cond:
    584             self.__flag = True
    585             self.__cond.notify_all()
    586 
    587     def clear(self):
    588         """Reset the internal flag to false.
    589 
    590         Subsequently, threads calling wait() will block until set() is called to
    591         set the internal flag to true again.
    592 
    593         """
    594         with self.__cond:
    595             self.__flag = False
    596 
    597     def wait(self, timeout=None):
    598         """Block until the internal flag is true.
    599 
    600         If the internal flag is true on entry, return immediately. Otherwise,
    601         block until another thread calls set() to set the flag to true, or until
    602         the optional timeout occurs.
    603 
    604         When the timeout argument is present and not None, it should be a
    605         floating point number specifying a timeout for the operation in seconds
    606         (or fractions thereof).
    607 
    608         This method returns the internal flag on exit, so it will always return
    609         True except if a timeout is given and the operation times out.
    610 
    611         """
    612         with self.__cond:
    613             if not self.__flag:
    614                 self.__cond.wait(timeout)
    615             return self.__flag
    616 
    617 # Helper to generate new thread names
    618 _counter = _count().next
    619 _counter() # Consume 0 so first non-main thread has id 1.
    620 def _newname(template="Thread-%d"):
    621     return template % _counter()
    622 
    623 # Active thread administration
    624 _active_limbo_lock = _allocate_lock()
    625 _active = {}    # maps thread id to Thread object
    626 _limbo = {}
    627 
    628 
    629 # Main class for threads
    630 
    631 class Thread(_Verbose):
    632     """A class that represents a thread of control.
    633 
    634     This class can be safely subclassed in a limited fashion.
    635 
    636     """
    637     __initialized = False
    638     # Need to store a reference to sys.exc_info for printing
    639     # out exceptions when a thread tries to use a global var. during interp.
    640     # shutdown and thus raises an exception about trying to perform some
    641     # operation on/with a NoneType
    642     __exc_info = _sys.exc_info
    643     # Keep sys.exc_clear too to clear the exception just before
    644     # allowing .join() to return.
    645     __exc_clear = _sys.exc_clear
    646 
    647     def __init__(self, group=None, target=None, name=None,
    648                  args=(), kwargs=None, verbose=None):
    649         """This constructor should always be called with keyword arguments. Arguments are:
    650 
    651         *group* should be None; reserved for future extension when a ThreadGroup
    652         class is implemented.
    653 
    654         *target* is the callable object to be invoked by the run()
    655         method. Defaults to None, meaning nothing is called.
    656 
    657         *name* is the thread name. By default, a unique name is constructed of
    658         the form "Thread-N" where N is a small decimal number.
    659 
    660         *args* is the argument tuple for the target invocation. Defaults to ().
    661 
    662         *kwargs* is a dictionary of keyword arguments for the target
    663         invocation. Defaults to {}.
    664 
    665         If a subclass overrides the constructor, it must make sure to invoke
    666         the base class constructor (Thread.__init__()) before doing anything
    667         else to the thread.
    668 
    669 """
    670         assert group is None, "group argument must be None for now"
    671         _Verbose.__init__(self, verbose)
    672         if kwargs is None:
    673             kwargs = {}
    674         self.__target = target
    675         self.__name = str(name or _newname())
    676         self.__args = args
    677         self.__kwargs = kwargs
    678         self.__daemonic = self._set_daemon()
    679         self.__ident = None
    680         self.__started = Event()
    681         self.__stopped = False
    682         self.__block = Condition(Lock())
    683         self.__initialized = True
    684         # sys.stderr is not stored in the class like
    685         # sys.exc_info since it can be changed between instances
    686         self.__stderr = _sys.stderr
    687 
    688     def _reset_internal_locks(self):
    689         # private!  Called by _after_fork() to reset our internal locks as
    690         # they may be in an invalid state leading to a deadlock or crash.
    691         if hasattr(self, '_Thread__block'):  # DummyThread deletes self.__block
    692             self.__block.__init__()
    693         self.__started._reset_internal_locks()
    694 
    695     @property
    696     def _block(self):
    697         # used by a unittest
    698         return self.__block
    699 
    700     def _set_daemon(self):
    701         # Overridden in _MainThread and _DummyThread
    702         return current_thread().daemon
    703 
    704     def __repr__(self):
    705         assert self.__initialized, "Thread.__init__() was not called"
    706         status = "initial"
    707         if self.__started.is_set():
    708             status = "started"
    709         if self.__stopped:
    710             status = "stopped"
    711         if self.__daemonic:
    712             status += " daemon"
    713         if self.__ident is not None:
    714             status += " %s" % self.__ident
    715         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
    716 
    717     def start(self):
    718         """Start the thread's activity.
    719 
    720         It must be called at most once per thread object. It arranges for the
    721         object's run() method to be invoked in a separate thread of control.
    722 
    723         This method will raise a RuntimeError if called more than once on the
    724         same thread object.
    725 
    726         """
    727         if not self.__initialized:
    728             raise RuntimeError("thread.__init__() not called")
    729         if self.__started.is_set():
    730             raise RuntimeError("threads can only be started once")
    731         if __debug__:
    732             self._note("%s.start(): starting thread", self)
    733         with _active_limbo_lock:
    734             _limbo[self] = self
    735         try:
    736             _start_new_thread(self.__bootstrap, ())
    737         except Exception:
    738             with _active_limbo_lock:
    739                 del _limbo[self]
    740             raise
    741         self.__started.wait()
    742 
    743     def run(self):
    744         """Method representing the thread's activity.
    745 
    746         You may override this method in a subclass. The standard run() method
    747         invokes the callable object passed to the object's constructor as the
    748         target argument, if any, with sequential and keyword arguments taken
    749         from the args and kwargs arguments, respectively.
    750 
    751         """
    752         try:
    753             if self.__target:
    754                 self.__target(*self.__args, **self.__kwargs)
    755         finally:
    756             # Avoid a refcycle if the thread is running a function with
    757             # an argument that has a member that points to the thread.
    758             del self.__target, self.__args, self.__kwargs
    759 
    760     def __bootstrap(self):
    761         # Wrapper around the real bootstrap code that ignores
    762         # exceptions during interpreter cleanup.  Those typically
    763         # happen when a daemon thread wakes up at an unfortunate
    764         # moment, finds the world around it destroyed, and raises some
    765         # random exception *** while trying to report the exception in
    766         # __bootstrap_inner() below ***.  Those random exceptions
    767         # don't help anybody, and they confuse users, so we suppress
    768         # them.  We suppress them only when it appears that the world
    769         # indeed has already been destroyed, so that exceptions in
    770         # __bootstrap_inner() during normal business hours are properly
    771         # reported.  Also, we only suppress them for daemonic threads;
    772         # if a non-daemonic encounters this, something else is wrong.
    773         try:
    774             self.__bootstrap_inner()
    775         except:
    776             if self.__daemonic and _sys is None:
    777                 return
    778             raise
    779 
    780     def _set_ident(self):
    781         self.__ident = _get_ident()
    782 
    783     def __bootstrap_inner(self):
    784         try:
    785             self._set_ident()
    786             self.__started.set()
    787             with _active_limbo_lock:
    788                 _active[self.__ident] = self
    789                 del _limbo[self]
    790             if __debug__:
    791                 self._note("%s.__bootstrap(): thread started", self)
    792 
    793             if _trace_hook:
    794                 self._note("%s.__bootstrap(): registering trace hook", self)
    795                 _sys.settrace(_trace_hook)
    796             if _profile_hook:
    797                 self._note("%s.__bootstrap(): registering profile hook", self)
    798                 _sys.setprofile(_profile_hook)
    799 
    800             try:
    801                 self.run()
    802             except SystemExit:
    803                 if __debug__:
    804                     self._note("%s.__bootstrap(): raised SystemExit", self)
    805             except:
    806                 if __debug__:
    807                     self._note("%s.__bootstrap(): unhandled exception", self)
    808                 # If sys.stderr is no more (most likely from interpreter
    809                 # shutdown) use self.__stderr.  Otherwise still use sys (as in
    810                 # _sys) in case sys.stderr was redefined since the creation of
    811                 # self.
    812                 if _sys and _sys.stderr is not None:
    813                     print>>_sys.stderr, ("Exception in thread %s:\n%s" %
    814                                          (self.name, _format_exc()))
    815                 elif self.__stderr is not None:
    816                     # Do the best job possible w/o a huge amt. of code to
    817                     # approximate a traceback (code ideas from
    818                     # Lib/traceback.py)
    819                     exc_type, exc_value, exc_tb = self.__exc_info()
    820                     try:
    821                         print>>self.__stderr, (
    822                             "Exception in thread " + self.name +
    823                             " (most likely raised during interpreter shutdown):")
    824                         print>>self.__stderr, (
    825                             "Traceback (most recent call last):")
    826                         while exc_tb:
    827                             print>>self.__stderr, (
    828                                 '  File "%s", line %s, in %s' %
    829                                 (exc_tb.tb_frame.f_code.co_filename,
    830                                     exc_tb.tb_lineno,
    831                                     exc_tb.tb_frame.f_code.co_name))
    832                             exc_tb = exc_tb.tb_next
    833                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
    834                     # Make sure that exc_tb gets deleted since it is a memory
    835                     # hog; deleting everything else is just for thoroughness
    836                     finally:
    837                         del exc_type, exc_value, exc_tb
    838             else:
    839                 if __debug__:
    840                     self._note("%s.__bootstrap(): normal return", self)
    841             finally:
    842                 # Prevent a race in
    843                 # test_threading.test_no_refcycle_through_target when
    844                 # the exception keeps the target alive past when we
    845                 # assert that it's dead.
    846                 self.__exc_clear()
    847         finally:
    848             with _active_limbo_lock:
    849                 self.__stop()
    850                 try:
    851                     # We don't call self.__delete() because it also
    852                     # grabs _active_limbo_lock.
    853                     del _active[_get_ident()]
    854                 except:
    855                     pass
    856 
    857     def __stop(self):
    858         # DummyThreads delete self.__block, but they have no waiters to
    859         # notify anyway (join() is forbidden on them).
    860         if not hasattr(self, '_Thread__block'):
    861             return
    862         self.__block.acquire()
    863         self.__stopped = True
    864         self.__block.notify_all()
    865         self.__block.release()
    866 
    867     def __delete(self):
    868         "Remove current thread from the dict of currently running threads."
    869 
    870         # Notes about running with dummy_thread:
    871         #
    872         # Must take care to not raise an exception if dummy_thread is being
    873         # used (and thus this module is being used as an instance of
    874         # dummy_threading).  dummy_thread.get_ident() always returns -1 since
    875         # there is only one thread if dummy_thread is being used.  Thus
    876         # len(_active) is always <= 1 here, and any Thread instance created
    877         # overwrites the (if any) thread currently registered in _active.
    878         #
    879         # An instance of _MainThread is always created by 'threading'.  This
    880         # gets overwritten the instant an instance of Thread is created; both
    881         # threads return -1 from dummy_thread.get_ident() and thus have the
    882         # same key in the dict.  So when the _MainThread instance created by
    883         # 'threading' tries to clean itself up when atexit calls this method
    884         # it gets a KeyError if another Thread instance was created.
    885         #
    886         # This all means that KeyError from trying to delete something from
    887         # _active if dummy_threading is being used is a red herring.  But
    888         # since it isn't if dummy_threading is *not* being used then don't
    889         # hide the exception.
    890 
    891         try:
    892             with _active_limbo_lock:
    893                 del _active[_get_ident()]
    894                 # There must not be any python code between the previous line
    895                 # and after the lock is released.  Otherwise a tracing function
    896                 # could try to acquire the lock again in the same thread, (in
    897                 # current_thread()), and would block.
    898         except KeyError:
    899             if 'dummy_threading' not in _sys.modules:
    900                 raise
    901 
    902     def join(self, timeout=None):
    903         """Wait until the thread terminates.
    904 
    905         This blocks the calling thread until the thread whose join() method is
    906         called terminates -- either normally or through an unhandled exception
    907         or until the optional timeout occurs.
    908 
    909         When the timeout argument is present and not None, it should be a
    910         floating point number specifying a timeout for the operation in seconds
    911         (or fractions thereof). As join() always returns None, you must call
    912         isAlive() after join() to decide whether a timeout happened -- if the
    913         thread is still alive, the join() call timed out.
    914 
    915         When the timeout argument is not present or None, the operation will
    916         block until the thread terminates.
    917 
    918         A thread can be join()ed many times.
    919 
    920         join() raises a RuntimeError if an attempt is made to join the current
    921         thread as that would cause a deadlock. It is also an error to join() a
    922         thread before it has been started and attempts to do so raises the same
    923         exception.
    924 
    925         """
    926         if not self.__initialized:
    927             raise RuntimeError("Thread.__init__() not called")
    928         if not self.__started.is_set():
    929             raise RuntimeError("cannot join thread before it is started")
    930         if self is current_thread():
    931             raise RuntimeError("cannot join current thread")
    932 
    933         if __debug__:
    934             if not self.__stopped:
    935                 self._note("%s.join(): waiting until thread stops", self)
    936         self.__block.acquire()
    937         try:
    938             if timeout is None:
    939                 while not self.__stopped:
    940                     self.__block.wait()
    941                 if __debug__:
    942                     self._note("%s.join(): thread stopped", self)
    943             else:
    944                 deadline = _time() + timeout
    945                 while not self.__stopped:
    946                     delay = deadline - _time()
    947                     if delay <= 0:
    948                         if __debug__:
    949                             self._note("%s.join(): timed out", self)
    950                         break
    951                     self.__block.wait(delay)
    952                 else:
    953                     if __debug__:
    954                         self._note("%s.join(): thread stopped", self)
    955         finally:
    956             self.__block.release()
    957 
    958     @property
    959     def name(self):
    960         """A string used for identification purposes only.
    961 
    962         It has no semantics. Multiple threads may be given the same name. The
    963         initial name is set by the constructor.
    964 
    965         """
    966         assert self.__initialized, "Thread.__init__() not called"
    967         return self.__name
    968 
    969     @name.setter
    970     def name(self, name):
    971         assert self.__initialized, "Thread.__init__() not called"
    972         self.__name = str(name)
    973 
    974     @property
    975     def ident(self):
    976         """Thread identifier of this thread or None if it has not been started.
    977 
    978         This is a nonzero integer. See the thread.get_ident() function. Thread
    979         identifiers may be recycled when a thread exits and another thread is
    980         created. The identifier is available even after the thread has exited.
    981 
    982         """
    983         assert self.__initialized, "Thread.__init__() not called"
    984         return self.__ident
    985 
    986     def isAlive(self):
    987         """Return whether the thread is alive.
    988 
    989         This method returns True just before the run() method starts until just
    990         after the run() method terminates. The module function enumerate()
    991         returns a list of all alive threads.
    992 
    993         """
    994         assert self.__initialized, "Thread.__init__() not called"
    995         return self.__started.is_set() and not self.__stopped
    996 
    997     is_alive = isAlive
    998 
    999     @property
   1000     def daemon(self):
   1001         """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
   1002 
   1003         This must be set before start() is called, otherwise RuntimeError is
   1004         raised. Its initial value is inherited from the creating thread; the
   1005         main thread is not a daemon thread and therefore all threads created in
   1006         the main thread default to daemon = False.
   1007 
   1008         The entire Python program exits when no alive non-daemon threads are
   1009         left.
   1010 
   1011         """
   1012         assert self.__initialized, "Thread.__init__() not called"
   1013         return self.__daemonic
   1014 
   1015     @daemon.setter
   1016     def daemon(self, daemonic):
   1017         if not self.__initialized:
   1018             raise RuntimeError("Thread.__init__() not called")
   1019         if self.__started.is_set():
   1020             raise RuntimeError("cannot set daemon status of active thread");
   1021         self.__daemonic = daemonic
   1022 
   1023     def isDaemon(self):
   1024         return self.daemon
   1025 
   1026     def setDaemon(self, daemonic):
   1027         self.daemon = daemonic
   1028 
   1029     def getName(self):
   1030         return self.name
   1031 
   1032     def setName(self, name):
   1033         self.name = name
   1034 
   1035 # The timer class was contributed by Itamar Shtull-Trauring
   1036 
   1037 def Timer(*args, **kwargs):
   1038     """Factory function to create a Timer object.
   1039 
   1040     Timers call a function after a specified number of seconds:
   1041 
   1042         t = Timer(30.0, f, args=[], kwargs={})
   1043         t.start()
   1044         t.cancel()     # stop the timer's action if it's still waiting
   1045 
   1046     """
   1047     return _Timer(*args, **kwargs)
   1048 
   1049 class _Timer(Thread):
   1050     """Call a function after a specified number of seconds:
   1051 
   1052             t = Timer(30.0, f, args=[], kwargs={})
   1053             t.start()
   1054             t.cancel()     # stop the timer's action if it's still waiting
   1055 
   1056     """
   1057 
   1058     def __init__(self, interval, function, args=[], kwargs={}):
   1059         Thread.__init__(self)
   1060         self.interval = interval
   1061         self.function = function
   1062         self.args = args
   1063         self.kwargs = kwargs
   1064         self.finished = Event()
   1065 
   1066     def cancel(self):
   1067         """Stop the timer if it hasn't finished yet"""
   1068         self.finished.set()
   1069 
   1070     def run(self):
   1071         self.finished.wait(self.interval)
   1072         if not self.finished.is_set():
   1073             self.function(*self.args, **self.kwargs)
   1074         self.finished.set()
   1075 
   1076 # Special thread class to represent the main thread
   1077 # This is garbage collected through an exit handler
   1078 
   1079 class _MainThread(Thread):
   1080 
   1081     def __init__(self):
   1082         Thread.__init__(self, name="MainThread")
   1083         self._Thread__started.set()
   1084         self._set_ident()
   1085         with _active_limbo_lock:
   1086             _active[_get_ident()] = self
   1087 
   1088     def _set_daemon(self):
   1089         return False
   1090 
   1091     def _exitfunc(self):
   1092         self._Thread__stop()
   1093         t = _pickSomeNonDaemonThread()
   1094         if t:
   1095             if __debug__:
   1096                 self._note("%s: waiting for other threads", self)
   1097         while t:
   1098             t.join()
   1099             t = _pickSomeNonDaemonThread()
   1100         if __debug__:
   1101             self._note("%s: exiting", self)
   1102         self._Thread__delete()
   1103 
   1104 def _pickSomeNonDaemonThread():
   1105     for t in enumerate():
   1106         if not t.daemon and t.is_alive():
   1107             return t
   1108     return None
   1109 
   1110 
   1111 # Dummy thread class to represent threads not started here.
   1112 # These aren't garbage collected when they die, nor can they be waited for.
   1113 # If they invoke anything in threading.py that calls current_thread(), they
   1114 # leave an entry in the _active dict forever after.
   1115 # Their purpose is to return *something* from current_thread().
   1116 # They are marked as daemon threads so we won't wait for them
   1117 # when we exit (conform previous semantics).
   1118 
   1119 class _DummyThread(Thread):
   1120 
   1121     def __init__(self):
   1122         Thread.__init__(self, name=_newname("Dummy-%d"))
   1123 
   1124         # Thread.__block consumes an OS-level locking primitive, which
   1125         # can never be used by a _DummyThread.  Since a _DummyThread
   1126         # instance is immortal, that's bad, so release this resource.
   1127         del self._Thread__block
   1128 
   1129         self._Thread__started.set()
   1130         self._set_ident()
   1131         with _active_limbo_lock:
   1132             _active[_get_ident()] = self
   1133 
   1134     def _set_daemon(self):
   1135         return True
   1136 
   1137     def join(self, timeout=None):
   1138         assert False, "cannot join a dummy thread"
   1139 
   1140 
   1141 # Global API functions
   1142 
   1143 def currentThread():
   1144     """Return the current Thread object, corresponding to the caller's thread of control.
   1145 
   1146     If the caller's thread of control was not created through the threading
   1147     module, a dummy thread object with limited functionality is returned.
   1148 
   1149     """
   1150     try:
   1151         return _active[_get_ident()]
   1152     except KeyError:
   1153         ##print "current_thread(): no current thread for", _get_ident()
   1154         return _DummyThread()
   1155 
   1156 current_thread = currentThread
   1157 
   1158 def activeCount():
   1159     """Return the number of Thread objects currently alive.
   1160 
   1161     The returned count is equal to the length of the list returned by
   1162     enumerate().
   1163 
   1164     """
   1165     with _active_limbo_lock:
   1166         return len(_active) + len(_limbo)
   1167 
   1168 active_count = activeCount
   1169 
   1170 def _enumerate():
   1171     # Same as enumerate(), but without the lock. Internal use only.
   1172     return _active.values() + _limbo.values()
   1173 
   1174 def enumerate():
   1175     """Return a list of all Thread objects currently alive.
   1176 
   1177     The list includes daemonic threads, dummy thread objects created by
   1178     current_thread(), and the main thread. It excludes terminated threads and
   1179     threads that have not yet been started.
   1180 
   1181     """
   1182     with _active_limbo_lock:
   1183         return _active.values() + _limbo.values()
   1184 
   1185 from thread import stack_size
   1186 
   1187 # Create the main thread object,
   1188 # and make it available for the interpreter
   1189 # (Py_Main) as threading._shutdown.
   1190 
   1191 _shutdown = _MainThread()._exitfunc
   1192 
   1193 # get thread-local implementation, either from the thread
   1194 # module, or from the python fallback
   1195 
   1196 try:
   1197     from thread import _local as local
   1198 except ImportError:
   1199     from _threading_local import local
   1200 
   1201 
   1202 def _after_fork():
   1203     # This function is called by Python/ceval.c:PyEval_ReInitThreads which
   1204     # is called from PyOS_AfterFork.  Here we cleanup threading module state
   1205     # that should not exist after a fork.
   1206 
   1207     # Reset _active_limbo_lock, in case we forked while the lock was held
   1208     # by another (non-forked) thread.  http://bugs.python.org/issue874900
   1209     global _active_limbo_lock
   1210     _active_limbo_lock = _allocate_lock()
   1211 
   1212     # fork() only copied the current thread; clear references to others.
   1213     new_active = {}
   1214     current = current_thread()
   1215     with _active_limbo_lock:
   1216         for thread in _enumerate():
   1217             # Any lock/condition variable may be currently locked or in an
   1218             # invalid state, so we reinitialize them.
   1219             if hasattr(thread, '_reset_internal_locks'):
   1220                 thread._reset_internal_locks()
   1221             if thread is current:
   1222                 # There is only one active thread. We reset the ident to
   1223                 # its new value since it can have changed.
   1224                 ident = _get_ident()
   1225                 thread._Thread__ident = ident
   1226                 new_active[ident] = thread
   1227             else:
   1228                 # All the others are already stopped.
   1229                 thread._Thread__stop()
   1230 
   1231         _limbo.clear()
   1232         _active.clear()
   1233         _active.update(new_active)
   1234         assert len(_active) == 1
   1235 
   1236 
   1237 # Self-test code
   1238 
   1239 def _test():
   1240 
   1241     class BoundedQueue(_Verbose):
   1242 
   1243         def __init__(self, limit):
   1244             _Verbose.__init__(self)
   1245             self.mon = RLock()
   1246             self.rc = Condition(self.mon)
   1247             self.wc = Condition(self.mon)
   1248             self.limit = limit
   1249             self.queue = _deque()
   1250 
   1251         def put(self, item):
   1252             self.mon.acquire()
   1253             while len(self.queue) >= self.limit:
   1254                 self._note("put(%s): queue full", item)
   1255                 self.wc.wait()
   1256             self.queue.append(item)
   1257             self._note("put(%s): appended, length now %d",
   1258                        item, len(self.queue))
   1259             self.rc.notify()
   1260             self.mon.release()
   1261 
   1262         def get(self):
   1263             self.mon.acquire()
   1264             while not self.queue:
   1265                 self._note("get(): queue empty")
   1266                 self.rc.wait()
   1267             item = self.queue.popleft()
   1268             self._note("get(): got %s, %d left", item, len(self.queue))
   1269             self.wc.notify()
   1270             self.mon.release()
   1271             return item
   1272 
   1273     class ProducerThread(Thread):
   1274 
   1275         def __init__(self, queue, quota):
   1276             Thread.__init__(self, name="Producer")
   1277             self.queue = queue
   1278             self.quota = quota
   1279 
   1280         def run(self):
   1281             from random import random
   1282             counter = 0
   1283             while counter < self.quota:
   1284                 counter = counter + 1
   1285                 self.queue.put("%s.%d" % (self.name, counter))
   1286                 _sleep(random() * 0.00001)
   1287 
   1288 
   1289     class ConsumerThread(Thread):
   1290 
   1291         def __init__(self, queue, count):
   1292             Thread.__init__(self, name="Consumer")
   1293             self.queue = queue
   1294             self.count = count
   1295 
   1296         def run(self):
   1297             while self.count > 0:
   1298                 item = self.queue.get()
   1299                 print item
   1300                 self.count = self.count - 1
   1301 
   1302     NP = 3
   1303     QL = 4
   1304     NI = 5
   1305 
   1306     Q = BoundedQueue(QL)
   1307     P = []
   1308     for i in range(NP):
   1309         t = ProducerThread(Q, NI)
   1310         t.name = ("Producer-%d" % (i+1))
   1311         P.append(t)
   1312     C = ConsumerThread(Q, NI*NP)
   1313     for t in P:
   1314         t.start()
   1315         _sleep(0.000001)
   1316     C.start()
   1317     for t in P:
   1318         t.join()
   1319     C.join()
   1320 
   1321 if __name__ == '__main__':
   1322     _test()
   1323