Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module implementing queues
      3 #
      4 # multiprocessing/queues.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
     36 
     37 import sys
     38 import os
     39 import threading
     40 import collections
     41 import time
     42 import atexit
     43 import weakref
     44 
     45 from Queue import Empty, Full
     46 import _multiprocessing
     47 from multiprocessing import Pipe
     48 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
     49 from multiprocessing.util import debug, info, Finalize, register_after_fork
     50 from multiprocessing.forking import assert_spawning
     51 
     52 #
     53 # Queue type using a pipe, buffer and thread
     54 #
     55 
     56 class Queue(object):
     57 
     58     def __init__(self, maxsize=0):
     59         if maxsize <= 0:
     60             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
     61         self._maxsize = maxsize
     62         self._reader, self._writer = Pipe(duplex=False)
     63         self._rlock = Lock()
     64         self._opid = os.getpid()
     65         if sys.platform == 'win32':
     66             self._wlock = None
     67         else:
     68             self._wlock = Lock()
     69         self._sem = BoundedSemaphore(maxsize)
     70 
     71         self._after_fork()
     72 
     73         if sys.platform != 'win32':
     74             register_after_fork(self, Queue._after_fork)
     75 
     76     def __getstate__(self):
     77         assert_spawning(self)
     78         return (self._maxsize, self._reader, self._writer,
     79                 self._rlock, self._wlock, self._sem, self._opid)
     80 
     81     def __setstate__(self, state):
     82         (self._maxsize, self._reader, self._writer,
     83          self._rlock, self._wlock, self._sem, self._opid) = state
     84         self._after_fork()
     85 
     86     def _after_fork(self):
     87         debug('Queue._after_fork()')
     88         self._notempty = threading.Condition(threading.Lock())
     89         self._buffer = collections.deque()
     90         self._thread = None
     91         self._jointhread = None
     92         self._joincancelled = False
     93         self._closed = False
     94         self._close = None
     95         self._send = self._writer.send
     96         self._recv = self._reader.recv
     97         self._poll = self._reader.poll
     98 
     99     def put(self, obj, block=True, timeout=None):
    100         assert not self._closed
    101         if not self._sem.acquire(block, timeout):
    102             raise Full
    103 
    104         self._notempty.acquire()
    105         try:
    106             if self._thread is None:
    107                 self._start_thread()
    108             self._buffer.append(obj)
    109             self._notempty.notify()
    110         finally:
    111             self._notempty.release()
    112 
    113     def get(self, block=True, timeout=None):
    114         if block and timeout is None:
    115             self._rlock.acquire()
    116             try:
    117                 res = self._recv()
    118                 self._sem.release()
    119                 return res
    120             finally:
    121                 self._rlock.release()
    122 
    123         else:
    124             if block:
    125                 deadline = time.time() + timeout
    126             if not self._rlock.acquire(block, timeout):
    127                 raise Empty
    128             try:
    129                 if block:
    130                     timeout = deadline - time.time()
    131                     if timeout < 0 or not self._poll(timeout):
    132                         raise Empty
    133                 elif not self._poll():
    134                     raise Empty
    135                 res = self._recv()
    136                 self._sem.release()
    137                 return res
    138             finally:
    139                 self._rlock.release()
    140 
    141     def qsize(self):
    142         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
    143         return self._maxsize - self._sem._semlock._get_value()
    144 
    145     def empty(self):
    146         return not self._poll()
    147 
    148     def full(self):
    149         return self._sem._semlock._is_zero()
    150 
    151     def get_nowait(self):
    152         return self.get(False)
    153 
    154     def put_nowait(self, obj):
    155         return self.put(obj, False)
    156 
    157     def close(self):
    158         self._closed = True
    159         self._reader.close()
    160         if self._close:
    161             self._close()
    162 
    163     def join_thread(self):
    164         debug('Queue.join_thread()')
    165         assert self._closed
    166         if self._jointhread:
    167             self._jointhread()
    168 
    169     def cancel_join_thread(self):
    170         debug('Queue.cancel_join_thread()')
    171         self._joincancelled = True
    172         try:
    173             self._jointhread.cancel()
    174         except AttributeError:
    175             pass
    176 
    177     def _start_thread(self):
    178         debug('Queue._start_thread()')
    179 
    180         # Start thread which transfers data from buffer to pipe
    181         self._buffer.clear()
    182         self._thread = threading.Thread(
    183             target=Queue._feed,
    184             args=(self._buffer, self._notempty, self._send,
    185                   self._wlock, self._writer.close),
    186             name='QueueFeederThread'
    187             )
    188         self._thread.daemon = True
    189 
    190         debug('doing self._thread.start()')
    191         self._thread.start()
    192         debug('... done self._thread.start()')
    193 
    194         # On process exit we will wait for data to be flushed to pipe.
    195         if not self._joincancelled:
    196             self._jointhread = Finalize(
    197                 self._thread, Queue._finalize_join,
    198                 [weakref.ref(self._thread)],
    199                 exitpriority=-5
    200                 )
    201 
    202         # Send sentinel to the thread queue object when garbage collected
    203         self._close = Finalize(
    204             self, Queue._finalize_close,
    205             [self._buffer, self._notempty],
    206             exitpriority=10
    207             )
    208 
    209     @staticmethod
    210     def _finalize_join(twr):
    211         debug('joining queue thread')
    212         thread = twr()
    213         if thread is not None:
    214             thread.join()
    215             debug('... queue thread joined')
    216         else:
    217             debug('... queue thread already dead')
    218 
    219     @staticmethod
    220     def _finalize_close(buffer, notempty):
    221         debug('telling queue thread to quit')
    222         notempty.acquire()
    223         try:
    224             buffer.append(_sentinel)
    225             notempty.notify()
    226         finally:
    227             notempty.release()
    228 
    229     @staticmethod
    230     def _feed(buffer, notempty, send, writelock, close):
    231         debug('starting thread to feed data to pipe')
    232         from .util import is_exiting
    233 
    234         nacquire = notempty.acquire
    235         nrelease = notempty.release
    236         nwait = notempty.wait
    237         bpopleft = buffer.popleft
    238         sentinel = _sentinel
    239         if sys.platform != 'win32':
    240             wacquire = writelock.acquire
    241             wrelease = writelock.release
    242         else:
    243             wacquire = None
    244 
    245         try:
    246             while 1:
    247                 nacquire()
    248                 try:
    249                     if not buffer:
    250                         nwait()
    251                 finally:
    252                     nrelease()
    253                 try:
    254                     while 1:
    255                         obj = bpopleft()
    256                         if obj is sentinel:
    257                             debug('feeder thread got sentinel -- exiting')
    258                             close()
    259                             return
    260 
    261                         if wacquire is None:
    262                             send(obj)
    263                         else:
    264                             wacquire()
    265                             try:
    266                                 send(obj)
    267                             finally:
    268                                 wrelease()
    269                 except IndexError:
    270                     pass
    271         except Exception, e:
    272             # Since this runs in a daemon thread the resources it uses
    273             # may be become unusable while the process is cleaning up.
    274             # We ignore errors which happen after the process has
    275             # started to cleanup.
    276             try:
    277                 if is_exiting():
    278                     info('error in queue thread: %s', e)
    279                 else:
    280                     import traceback
    281                     traceback.print_exc()
    282             except Exception:
    283                 pass
    284 
    285 _sentinel = object()
    286 
    287 #
    288 # A queue type which also supports join() and task_done() methods
    289 #
    290 # Note that if you do not call task_done() for each finished task then
    291 # eventually the counter's semaphore may overflow causing Bad Things
    292 # to happen.
    293 #
    294 
    295 class JoinableQueue(Queue):
    296 
    297     def __init__(self, maxsize=0):
    298         Queue.__init__(self, maxsize)
    299         self._unfinished_tasks = Semaphore(0)
    300         self._cond = Condition()
    301 
    302     def __getstate__(self):
    303         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
    304 
    305     def __setstate__(self, state):
    306         Queue.__setstate__(self, state[:-2])
    307         self._cond, self._unfinished_tasks = state[-2:]
    308 
    309     def put(self, obj, block=True, timeout=None):
    310         assert not self._closed
    311         if not self._sem.acquire(block, timeout):
    312             raise Full
    313 
    314         self._notempty.acquire()
    315         self._cond.acquire()
    316         try:
    317             if self._thread is None:
    318                 self._start_thread()
    319             self._buffer.append(obj)
    320             self._unfinished_tasks.release()
    321             self._notempty.notify()
    322         finally:
    323             self._cond.release()
    324             self._notempty.release()
    325 
    326     def task_done(self):
    327         self._cond.acquire()
    328         try:
    329             if not self._unfinished_tasks.acquire(False):
    330                 raise ValueError('task_done() called too many times')
    331             if self._unfinished_tasks._semlock._is_zero():
    332                 self._cond.notify_all()
    333         finally:
    334             self._cond.release()
    335 
    336     def join(self):
    337         self._cond.acquire()
    338         try:
    339             if not self._unfinished_tasks._semlock._is_zero():
    340                 self._cond.wait()
    341         finally:
    342             self._cond.release()
    343 
    344 #
    345 # Simplified Queue type -- really just a locked pipe
    346 #
    347 
    348 class SimpleQueue(object):
    349 
    350     def __init__(self):
    351         self._reader, self._writer = Pipe(duplex=False)
    352         self._rlock = Lock()
    353         if sys.platform == 'win32':
    354             self._wlock = None
    355         else:
    356             self._wlock = Lock()
    357         self._make_methods()
    358 
    359     def empty(self):
    360         return not self._reader.poll()
    361 
    362     def __getstate__(self):
    363         assert_spawning(self)
    364         return (self._reader, self._writer, self._rlock, self._wlock)
    365 
    366     def __setstate__(self, state):
    367         (self._reader, self._writer, self._rlock, self._wlock) = state
    368         self._make_methods()
    369 
    370     def _make_methods(self):
    371         recv = self._reader.recv
    372         racquire, rrelease = self._rlock.acquire, self._rlock.release
    373         def get():
    374             racquire()
    375             try:
    376                 return recv()
    377             finally:
    378                 rrelease()
    379         self.get = get
    380 
    381         if self._wlock is None:
    382             # writes to a message oriented win32 pipe are atomic
    383             self.put = self._writer.send
    384         else:
    385             send = self._writer.send
    386             wacquire, wrelease = self._wlock.acquire, self._wlock.release
    387             def put(obj):
    388                 wacquire()
    389                 try:
    390                     return send(obj)
    391                 finally:
    392                     wrelease()
    393             self.put = put
    394