Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing the `Pool` class for managing a process pool
      3 #
      4 # multiprocessing/pool.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # Licensed to PSF under a Contributor Agreement.
      8 #
      9 
     10 __all__ = ['Pool', 'ThreadPool']
     11 
     12 #
     13 # Imports
     14 #
     15 
     16 import threading
     17 import queue
     18 import itertools
     19 import collections
     20 import os
     21 import time
     22 import traceback
     23 
     24 # If threading is available then ThreadPool should be provided.  Therefore
     25 # we avoid top-level imports which are liable to fail on some systems.
     26 from . import util
     27 from . import get_context, TimeoutError
     28 
     29 #
     30 # Constants representing the state of a pool
     31 #
     32 
     33 RUN = 0
     34 CLOSE = 1
     35 TERMINATE = 2
     36 
     37 #
     38 # Miscellaneous
     39 #
     40 
     41 job_counter = itertools.count()
     42 
     43 def mapstar(args):
     44     return list(map(*args))
     45 
     46 def starmapstar(args):
     47     return list(itertools.starmap(args[0], args[1]))
     48 
     49 #
     50 # Hack to embed stringification of remote traceback in local traceback
     51 #
     52 
     53 class RemoteTraceback(Exception):
     54     def __init__(self, tb):
     55         self.tb = tb
     56     def __str__(self):
     57         return self.tb
     58 
     59 class ExceptionWithTraceback:
     60     def __init__(self, exc, tb):
     61         tb = traceback.format_exception(type(exc), exc, tb)
     62         tb = ''.join(tb)
     63         self.exc = exc
     64         self.tb = '\n"""\n%s"""' % tb
     65     def __reduce__(self):
     66         return rebuild_exc, (self.exc, self.tb)
     67 
     68 def rebuild_exc(exc, tb):
     69     exc.__cause__ = RemoteTraceback(tb)
     70     return exc
     71 
     72 #
     73 # Code run by worker processes
     74 #
     75 
     76 class MaybeEncodingError(Exception):
     77     """Wraps possible unpickleable errors, so they can be
     78     safely sent through the socket."""
     79 
     80     def __init__(self, exc, value):
     81         self.exc = repr(exc)
     82         self.value = repr(value)
     83         super(MaybeEncodingError, self).__init__(self.exc, self.value)
     84 
     85     def __str__(self):
     86         return "Error sending result: '%s'. Reason: '%s'" % (self.value,
     87                                                              self.exc)
     88 
     89     def __repr__(self):
     90         return "<%s: %s>" % (self.__class__.__name__, self)
     91 
     92 
     93 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
     94            wrap_exception=False):
     95     if (maxtasks is not None) and not (isinstance(maxtasks, int)
     96                                        and maxtasks >= 1):
     97         raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
     98     put = outqueue.put
     99     get = inqueue.get
    100     if hasattr(inqueue, '_writer'):
    101         inqueue._writer.close()
    102         outqueue._reader.close()
    103 
    104     if initializer is not None:
    105         initializer(*initargs)
    106 
    107     completed = 0
    108     while maxtasks is None or (maxtasks and completed < maxtasks):
    109         try:
    110             task = get()
    111         except (EOFError, OSError):
    112             util.debug('worker got EOFError or OSError -- exiting')
    113             break
    114 
    115         if task is None:
    116             util.debug('worker got sentinel -- exiting')
    117             break
    118 
    119         job, i, func, args, kwds = task
    120         try:
    121             result = (True, func(*args, **kwds))
    122         except Exception as e:
    123             if wrap_exception and func is not _helper_reraises_exception:
    124                 e = ExceptionWithTraceback(e, e.__traceback__)
    125             result = (False, e)
    126         try:
    127             put((job, i, result))
    128         except Exception as e:
    129             wrapped = MaybeEncodingError(e, result[1])
    130             util.debug("Possible encoding error while sending result: %s" % (
    131                 wrapped))
    132             put((job, i, (False, wrapped)))
    133 
    134         task = job = result = func = args = kwds = None
    135         completed += 1
    136     util.debug('worker exiting after %d tasks' % completed)
    137 
    138 def _helper_reraises_exception(ex):
    139     'Pickle-able helper function for use by _guarded_task_generation.'
    140     raise ex
    141 
    142 #
    143 # Class representing a process pool
    144 #
    145 
    146 class Pool(object):
    147     '''
    148     Class which supports an async version of applying functions to arguments.
    149     '''
    150     _wrap_exception = True
    151 
    152     def Process(self, *args, **kwds):
    153         return self._ctx.Process(*args, **kwds)
    154 
    155     def __init__(self, processes=None, initializer=None, initargs=(),
    156                  maxtasksperchild=None, context=None):
    157         self._ctx = context or get_context()
    158         self._setup_queues()
    159         self._taskqueue = queue.SimpleQueue()
    160         self._cache = {}
    161         self._state = RUN
    162         self._maxtasksperchild = maxtasksperchild
    163         self._initializer = initializer
    164         self._initargs = initargs
    165 
    166         if processes is None:
    167             processes = os.cpu_count() or 1
    168         if processes < 1:
    169             raise ValueError("Number of processes must be at least 1")
    170 
    171         if initializer is not None and not callable(initializer):
    172             raise TypeError('initializer must be a callable')
    173 
    174         self._processes = processes
    175         self._pool = []
    176         self._repopulate_pool()
    177 
    178         self._worker_handler = threading.Thread(
    179             target=Pool._handle_workers,
    180             args=(self, )
    181             )
    182         self._worker_handler.daemon = True
    183         self._worker_handler._state = RUN
    184         self._worker_handler.start()
    185 
    186 
    187         self._task_handler = threading.Thread(
    188             target=Pool._handle_tasks,
    189             args=(self._taskqueue, self._quick_put, self._outqueue,
    190                   self._pool, self._cache)
    191             )
    192         self._task_handler.daemon = True
    193         self._task_handler._state = RUN
    194         self._task_handler.start()
    195 
    196         self._result_handler = threading.Thread(
    197             target=Pool._handle_results,
    198             args=(self._outqueue, self._quick_get, self._cache)
    199             )
    200         self._result_handler.daemon = True
    201         self._result_handler._state = RUN
    202         self._result_handler.start()
    203 
    204         self._terminate = util.Finalize(
    205             self, self._terminate_pool,
    206             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
    207                   self._worker_handler, self._task_handler,
    208                   self._result_handler, self._cache),
    209             exitpriority=15
    210             )
    211 
    212     def _join_exited_workers(self):
    213         """Cleanup after any worker processes which have exited due to reaching
    214         their specified lifetime.  Returns True if any workers were cleaned up.
    215         """
    216         cleaned = False
    217         for i in reversed(range(len(self._pool))):
    218             worker = self._pool[i]
    219             if worker.exitcode is not None:
    220                 # worker exited
    221                 util.debug('cleaning up worker %d' % i)
    222                 worker.join()
    223                 cleaned = True
    224                 del self._pool[i]
    225         return cleaned
    226 
    227     def _repopulate_pool(self):
    228         """Bring the number of pool processes up to the specified number,
    229         for use after reaping workers which have exited.
    230         """
    231         for i in range(self._processes - len(self._pool)):
    232             w = self.Process(target=worker,
    233                              args=(self._inqueue, self._outqueue,
    234                                    self._initializer,
    235                                    self._initargs, self._maxtasksperchild,
    236                                    self._wrap_exception)
    237                             )
    238             self._pool.append(w)
    239             w.name = w.name.replace('Process', 'PoolWorker')
    240             w.daemon = True
    241             w.start()
    242             util.debug('added worker')
    243 
    244     def _maintain_pool(self):
    245         """Clean up any exited workers and start replacements for them.
    246         """
    247         if self._join_exited_workers():
    248             self._repopulate_pool()
    249 
    250     def _setup_queues(self):
    251         self._inqueue = self._ctx.SimpleQueue()
    252         self._outqueue = self._ctx.SimpleQueue()
    253         self._quick_put = self._inqueue._writer.send
    254         self._quick_get = self._outqueue._reader.recv
    255 
    256     def apply(self, func, args=(), kwds={}):
    257         '''
    258         Equivalent of `func(*args, **kwds)`.
    259         Pool must be running.
    260         '''
    261         return self.apply_async(func, args, kwds).get()
    262 
    263     def map(self, func, iterable, chunksize=None):
    264         '''
    265         Apply `func` to each element in `iterable`, collecting the results
    266         in a list that is returned.
    267         '''
    268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):
    271         '''
    272         Like `map()` method but the elements of the `iterable` are expected to
    273         be iterables as well and will be unpacked as arguments. Hence
    274         `func` and (a, b) becomes func(a, b).
    275         '''
    276         return self._map_async(func, iterable, starmapstar, chunksize).get()
    277 
    278     def starmap_async(self, func, iterable, chunksize=None, callback=None,
    279             error_callback=None):
    280         '''
    281         Asynchronous version of `starmap()` method.
    282         '''
    283         return self._map_async(func, iterable, starmapstar, chunksize,
    284                                callback, error_callback)
    285 
    286     def _guarded_task_generation(self, result_job, func, iterable):
    287         '''Provides a generator of tasks for imap and imap_unordered with
    288         appropriate handling for iterables which throw exceptions during
    289         iteration.'''
    290         try:
    291             i = -1
    292             for i, x in enumerate(iterable):
    293                 yield (result_job, i, func, (x,), {})
    294         except Exception as e:
    295             yield (result_job, i+1, _helper_reraises_exception, (e,), {})
    296 
    297     def imap(self, func, iterable, chunksize=1):
    298         '''
    299         Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
    300         '''
    301         if self._state != RUN:
    302             raise ValueError("Pool not running")
    303         if chunksize == 1:
    304             result = IMapIterator(self._cache)
    305             self._taskqueue.put(
    306                 (
    307                     self._guarded_task_generation(result._job, func, iterable),
    308                     result._set_length
    309                 ))
    310             return result
    311         else:
    312             if chunksize < 1:
    313                 raise ValueError(
    314                     "Chunksize must be 1+, not {0:n}".format(
    315                         chunksize))
    316             task_batches = Pool._get_tasks(func, iterable, chunksize)
    317             result = IMapIterator(self._cache)
    318             self._taskqueue.put(
    319                 (
    320                     self._guarded_task_generation(result._job,
    321                                                   mapstar,
    322                                                   task_batches),
    323                     result._set_length
    324                 ))
    325             return (item for chunk in result for item in chunk)
    326 
    327     def imap_unordered(self, func, iterable, chunksize=1):
    328         '''
    329         Like `imap()` method but ordering of results is arbitrary.
    330         '''
    331         if self._state != RUN:
    332             raise ValueError("Pool not running")
    333         if chunksize == 1:
    334             result = IMapUnorderedIterator(self._cache)
    335             self._taskqueue.put(
    336                 (
    337                     self._guarded_task_generation(result._job, func, iterable),
    338                     result._set_length
    339                 ))
    340             return result
    341         else:
    342             if chunksize < 1:
    343                 raise ValueError(
    344                     "Chunksize must be 1+, not {0!r}".format(chunksize))
    345             task_batches = Pool._get_tasks(func, iterable, chunksize)
    346             result = IMapUnorderedIterator(self._cache)
    347             self._taskqueue.put(
    348                 (
    349                     self._guarded_task_generation(result._job,
    350                                                   mapstar,
    351                                                   task_batches),
    352                     result._set_length
    353                 ))
    354             return (item for chunk in result for item in chunk)
    355 
    356     def apply_async(self, func, args=(), kwds={}, callback=None,
    357             error_callback=None):
    358         '''
    359         Asynchronous version of `apply()` method.
    360         '''
    361         if self._state != RUN:
    362             raise ValueError("Pool not running")
    363         result = ApplyResult(self._cache, callback, error_callback)
    364         self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    365         return result
    366 
    367     def map_async(self, func, iterable, chunksize=None, callback=None,
    368             error_callback=None):
    369         '''
    370         Asynchronous version of `map()` method.
    371         '''
    372         return self._map_async(func, iterable, mapstar, chunksize, callback,
    373             error_callback)
    374 
    375     def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
    376             error_callback=None):
    377         '''
    378         Helper function to implement map, starmap and their async counterparts.
    379         '''
    380         if self._state != RUN:
    381             raise ValueError("Pool not running")
    382         if not hasattr(iterable, '__len__'):
    383             iterable = list(iterable)
    384 
    385         if chunksize is None:
    386             chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
    387             if extra:
    388                 chunksize += 1
    389         if len(iterable) == 0:
    390             chunksize = 0
    391 
    392         task_batches = Pool._get_tasks(func, iterable, chunksize)
    393         result = MapResult(self._cache, chunksize, len(iterable), callback,
    394                            error_callback=error_callback)
    395         self._taskqueue.put(
    396             (
    397                 self._guarded_task_generation(result._job,
    398                                               mapper,
    399                                               task_batches),
    400                 None
    401             )
    402         )
    403         return result
    404 
    405     @staticmethod
    406     def _handle_workers(pool):
    407         thread = threading.current_thread()
    408 
    409         # Keep maintaining workers until the cache gets drained, unless the pool
    410         # is terminated.
    411         while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
    412             pool._maintain_pool()
    413             time.sleep(0.1)
    414         # send sentinel to stop workers
    415         pool._taskqueue.put(None)
    416         util.debug('worker handler exiting')
    417 
    418     @staticmethod
    419     def _handle_tasks(taskqueue, put, outqueue, pool, cache):
    420         thread = threading.current_thread()
    421 
    422         for taskseq, set_length in iter(taskqueue.get, None):
    423             task = None
    424             try:
    425                 # iterating taskseq cannot fail
    426                 for task in taskseq:
    427                     if thread._state:
    428                         util.debug('task handler found thread._state != RUN')
    429                         break
    430                     try:
    431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]
    434                         try:
    435                             cache[job]._set(idx, (False, e))
    436                         except KeyError:
    437                             pass
    438                 else:
    439                     if set_length:
    440                         util.debug('doing set_length()')
    441                         idx = task[1] if task else -1
    442                         set_length(idx + 1)
    443                     continue
    444                 break
    445             finally:
    446                 task = taskseq = job = None
    447         else:
    448             util.debug('task handler got sentinel')
    449 
    450         try:
    451             # tell result handler to finish when cache is empty
    452             util.debug('task handler sending sentinel to result handler')
    453             outqueue.put(None)
    454 
    455             # tell workers there is no more work
    456             util.debug('task handler sending sentinel to workers')
    457             for p in pool:
    458                 put(None)
    459         except OSError:
    460             util.debug('task handler got OSError when sending sentinels')
    461 
    462         util.debug('task handler exiting')
    463 
    464     @staticmethod
    465     def _handle_results(outqueue, get, cache):
    466         thread = threading.current_thread()
    467 
    468         while 1:
    469             try:
    470                 task = get()
    471             except (OSError, EOFError):
    472                 util.debug('result handler got EOFError/OSError -- exiting')
    473                 return
    474 
    475             if thread._state:
    476                 assert thread._state == TERMINATE, "Thread not in TERMINATE"
    477                 util.debug('result handler found thread._state=TERMINATE')
    478                 break
    479 
    480             if task is None:
    481                 util.debug('result handler got sentinel')
    482                 break
    483 
    484             job, i, obj = task
    485             try:
    486                 cache[job]._set(i, obj)
    487             except KeyError:
    488                 pass
    489             task = job = obj = None
    490 
    491         while cache and thread._state != TERMINATE:
    492             try:
    493                 task = get()
    494             except (OSError, EOFError):
    495                 util.debug('result handler got EOFError/OSError -- exiting')
    496                 return
    497 
    498             if task is None:
    499                 util.debug('result handler ignoring extra sentinel')
    500                 continue
    501             job, i, obj = task
    502             try:
    503                 cache[job]._set(i, obj)
    504             except KeyError:
    505                 pass
    506             task = job = obj = None
    507 
    508         if hasattr(outqueue, '_reader'):
    509             util.debug('ensuring that outqueue is not full')
    510             # If we don't make room available in outqueue then
    511             # attempts to add the sentinel (None) to outqueue may
    512             # block.  There is guaranteed to be no more than 2 sentinels.
    513             try:
    514                 for i in range(10):
    515                     if not outqueue._reader.poll():
    516                         break
    517                     get()
    518             except (OSError, EOFError):
    519                 pass
    520 
    521         util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
    522               len(cache), thread._state)
    523 
    524     @staticmethod
    525     def _get_tasks(func, it, size):
    526         it = iter(it)
    527         while 1:
    528             x = tuple(itertools.islice(it, size))
    529             if not x:
    530                 return
    531             yield (func, x)
    532 
    533     def __reduce__(self):
    534         raise NotImplementedError(
    535               'pool objects cannot be passed between processes or pickled'
    536               )
    537 
    538     def close(self):
    539         util.debug('closing pool')
    540         if self._state == RUN:
    541             self._state = CLOSE
    542             self._worker_handler._state = CLOSE
    543 
    544     def terminate(self):
    545         util.debug('terminating pool')
    546         self._state = TERMINATE
    547         self._worker_handler._state = TERMINATE
    548         self._terminate()
    549 
    550     def join(self):
    551         util.debug('joining pool')
    552         if self._state == RUN:
    553             raise ValueError("Pool is still running")
    554         elif self._state not in (CLOSE, TERMINATE):
    555             raise ValueError("In unknown state")
    556         self._worker_handler.join()
    557         self._task_handler.join()
    558         self._result_handler.join()
    559         for p in self._pool:
    560             p.join()
    561 
    562     @staticmethod
    563     def _help_stuff_finish(inqueue, task_handler, size):
    564         # task_handler may be blocked trying to put items on inqueue
    565         util.debug('removing tasks from inqueue until task handler finished')
    566         inqueue._rlock.acquire()
    567         while task_handler.is_alive() and inqueue._reader.poll():
    568             inqueue._reader.recv()
    569             time.sleep(0)
    570 
    571     @classmethod
    572     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
    573                         worker_handler, task_handler, result_handler, cache):
    574         # this is guaranteed to only be called once
    575         util.debug('finalizing pool')
    576 
    577         worker_handler._state = TERMINATE
    578         task_handler._state = TERMINATE
    579 
    580         util.debug('helping task handler/workers to finish')
    581         cls._help_stuff_finish(inqueue, task_handler, len(pool))
    582 
    583         if (not result_handler.is_alive()) and (len(cache) != 0):
    584             raise AssertionError(
    585                 "Cannot have cache with result_hander not alive")
    586 
    587         result_handler._state = TERMINATE
    588         outqueue.put(None)                  # sentinel
    589 
    590         # We must wait for the worker handler to exit before terminating
    591         # workers because we don't want workers to be restarted behind our back.
    592         util.debug('joining worker handler')
    593         if threading.current_thread() is not worker_handler:
    594             worker_handler.join()
    595 
    596         # Terminate workers which haven't already finished.
    597         if pool and hasattr(pool[0], 'terminate'):
    598             util.debug('terminating workers')
    599             for p in pool:
    600                 if p.exitcode is None:
    601                     p.terminate()
    602 
    603         util.debug('joining task handler')
    604         if threading.current_thread() is not task_handler:
    605             task_handler.join()
    606 
    607         util.debug('joining result handler')
    608         if threading.current_thread() is not result_handler:
    609             result_handler.join()
    610 
    611         if pool and hasattr(pool[0], 'terminate'):
    612             util.debug('joining pool workers')
    613             for p in pool:
    614                 if p.is_alive():
    615                     # worker has not yet exited
    616                     util.debug('cleaning up worker %d' % p.pid)
    617                     p.join()
    618 
    619     def __enter__(self):
    620         return self
    621 
    622     def __exit__(self, exc_type, exc_val, exc_tb):
    623         self.terminate()
    624 
    625 #
    626 # Class whose instances are returned by `Pool.apply_async()`
    627 #
    628 
    629 class ApplyResult(object):
    630 
    631     def __init__(self, cache, callback, error_callback):
    632         self._event = threading.Event()
    633         self._job = next(job_counter)
    634         self._cache = cache
    635         self._callback = callback
    636         self._error_callback = error_callback
    637         cache[self._job] = self
    638 
    639     def ready(self):
    640         return self._event.is_set()
    641 
    642     def successful(self):
    643         if not self.ready():
    644             raise ValueError("{0!r} not ready".format(self))
    645         return self._success
    646 
    647     def wait(self, timeout=None):
    648         self._event.wait(timeout)
    649 
    650     def get(self, timeout=None):
    651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError
    654         if self._success:
    655             return self._value
    656         else:
    657             raise self._value
    658 
    659     def _set(self, i, obj):
    660         self._success, self._value = obj
    661         if self._callback and self._success:
    662             self._callback(self._value)
    663         if self._error_callback and not self._success:
    664             self._error_callback(self._value)
    665         self._event.set()
    666         del self._cache[self._job]
    667 
    668 AsyncResult = ApplyResult       # create alias -- see #17805
    669 
    670 #
    671 # Class whose instances are returned by `Pool.map_async()`
    672 #
    673 
    674 class MapResult(ApplyResult):
    675 
    676     def __init__(self, cache, chunksize, length, callback, error_callback):
    677         ApplyResult.__init__(self, cache, callback,
    678                              error_callback=error_callback)
    679         self._success = True
    680         self._value = [None] * length
    681         self._chunksize = chunksize
    682         if chunksize <= 0:
    683             self._number_left = 0
    684             self._event.set()
    685             del cache[self._job]
    686         else:
    687             self._number_left = length//chunksize + bool(length % chunksize)
    688 
    689     def _set(self, i, success_result):
    690         self._number_left -= 1
    691         success, result = success_result
    692         if success and self._success:
    693             self._value[i*self._chunksize:(i+1)*self._chunksize] = result
    694             if self._number_left == 0:
    695                 if self._callback:
    696                     self._callback(self._value)
    697                 del self._cache[self._job]
    698                 self._event.set()
    699         else:
    700             if not success and self._success:
    701                 # only store first exception
    702                 self._success = False
    703                 self._value = result
    704             if self._number_left == 0:
    705                 # only consider the result ready once all jobs are done
    706                 if self._error_callback:
    707                     self._error_callback(self._value)
    708                 del self._cache[self._job]
    709                 self._event.set()
    710 
    711 #
    712 # Class whose instances are returned by `Pool.imap()`
    713 #
    714 
    715 class IMapIterator(object):
    716 
    717     def __init__(self, cache):
    718         self._cond = threading.Condition(threading.Lock())
    719         self._job = next(job_counter)
    720         self._cache = cache
    721         self._items = collections.deque()
    722         self._index = 0
    723         self._length = None
    724         self._unsorted = {}
    725         cache[self._job] = self
    726 
    727     def __iter__(self):
    728         return self
    729 
    730     def next(self, timeout=None):
    731         with self._cond:
    732             try:
    733                 item = self._items.popleft()
    734             except IndexError:
    735                 if self._index == self._length:
    736                     raise StopIteration from None
    737                 self._cond.wait(timeout)
    738                 try:
    739                     item = self._items.popleft()
    740                 except IndexError:
    741                     if self._index == self._length:
    742                         raise StopIteration from None
    743                     raise TimeoutError from None
    744 
    745         success, value = item
    746         if success:
    747             return value
    748         raise value
    749 
    750     __next__ = next                    # XXX
    751 
    752     def _set(self, i, obj):
    753         with self._cond:
    754             if self._index == i:
    755                 self._items.append(obj)
    756                 self._index += 1
    757                 while self._index in self._unsorted:
    758                     obj = self._unsorted.pop(self._index)
    759                     self._items.append(obj)
    760                     self._index += 1
    761                 self._cond.notify()
    762             else:
    763                 self._unsorted[i] = obj
    764 
    765             if self._index == self._length:
    766                 del self._cache[self._job]
    767 
    768     def _set_length(self, length):
    769         with self._cond:
    770             self._length = length
    771             if self._index == self._length:
    772                 self._cond.notify()
    773                 del self._cache[self._job]
    774 
    775 #
    776 # Class whose instances are returned by `Pool.imap_unordered()`
    777 #
    778 
    779 class IMapUnorderedIterator(IMapIterator):
    780 
    781     def _set(self, i, obj):
    782         with self._cond:
    783             self._items.append(obj)
    784             self._index += 1
    785             self._cond.notify()
    786             if self._index == self._length:
    787                 del self._cache[self._job]
    788 
    789 #
    790 #
    791 #
    792 
    793 class ThreadPool(Pool):
    794     _wrap_exception = False
    795 
    796     @staticmethod
    797     def Process(*args, **kwds):
    798         from .dummy import Process
    799         return Process(*args, **kwds)
    800 
    801     def __init__(self, processes=None, initializer=None, initargs=()):
    802         Pool.__init__(self, processes, initializer, initargs)
    803 
    804     def _setup_queues(self):
    805         self._inqueue = queue.SimpleQueue()
    806         self._outqueue = queue.SimpleQueue()
    807         self._quick_put = self._inqueue.put
    808         self._quick_get = self._outqueue.get
    809 
    810     @staticmethod
    811     def _help_stuff_finish(inqueue, task_handler, size):
    812         # drain inqueue, and put sentinels at its head to make workers finish
    813         try:
    814             while True:
    815                 inqueue.get(block=False)
    816         except queue.Empty:
    817             pass
    818         for i in range(size):
    819             inqueue.put(None)
    820