Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module implementing queues
      3 #
      4 # multiprocessing/queues.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
     36 
     37 import sys
     38 import os
     39 import threading
     40 import collections
     41 import time
     42 import atexit
     43 import weakref
     44 
     45 from Queue import Empty, Full
     46 import _multiprocessing
     47 from . import Pipe
     48 from .synchronize import Lock, BoundedSemaphore, Semaphore, Condition
     49 from .util import debug, info, Finalize, register_after_fork, is_exiting
     50 from .forking import assert_spawning
     51 
     52 #
     53 # Queue type using a pipe, buffer and thread
     54 #
     55 
     56 class Queue(object):
     57 
     58     def __init__(self, maxsize=0):
     59         if maxsize <= 0:
     60             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
     61         self._maxsize = maxsize
     62         self._reader, self._writer = Pipe(duplex=False)
     63         self._rlock = Lock()
     64         self._opid = os.getpid()
     65         if sys.platform == 'win32':
     66             self._wlock = None
     67         else:
     68             self._wlock = Lock()
     69         self._sem = BoundedSemaphore(maxsize)
     70 
     71         self._after_fork()
     72 
     73         if sys.platform != 'win32':
     74             register_after_fork(self, Queue._after_fork)
     75 
     76     def __getstate__(self):
     77         assert_spawning(self)
     78         return (self._maxsize, self._reader, self._writer,
     79                 self._rlock, self._wlock, self._sem, self._opid)
     80 
     81     def __setstate__(self, state):
     82         (self._maxsize, self._reader, self._writer,
     83          self._rlock, self._wlock, self._sem, self._opid) = state
     84         self._after_fork()
     85 
     86     def _after_fork(self):
     87         debug('Queue._after_fork()')
     88         self._notempty = threading.Condition(threading.Lock())
     89         self._buffer = collections.deque()
     90         self._thread = None
     91         self._jointhread = None
     92         self._joincancelled = False
     93         self._closed = False
     94         self._close = None
     95         self._send = self._writer.send
     96         self._recv = self._reader.recv
     97         self._poll = self._reader.poll
     98 
     99     def put(self, obj, block=True, timeout=None):
    100         assert not self._closed
    101         if not self._sem.acquire(block, timeout):
    102             raise Full
    103 
    104         self._notempty.acquire()
    105         try:
    106             if self._thread is None:
    107                 self._start_thread()
    108             self._buffer.append(obj)
    109             self._notempty.notify()
    110         finally:
    111             self._notempty.release()
    112 
    113     def get(self, block=True, timeout=None):
    114         if block and timeout is None:
    115             self._rlock.acquire()
    116             try:
    117                 res = self._recv()
    118                 self._sem.release()
    119                 return res
    120             finally:
    121                 self._rlock.release()
    122 
    123         else:
    124             if block:
    125                 deadline = time.time() + timeout
    126             if not self._rlock.acquire(block, timeout):
    127                 raise Empty
    128             try:
    129                 if block:
    130                     timeout = deadline - time.time()
    131                     if timeout < 0 or not self._poll(timeout):
    132                         raise Empty
    133                 elif not self._poll():
    134                     raise Empty
    135                 res = self._recv()
    136                 self._sem.release()
    137                 return res
    138             finally:
    139                 self._rlock.release()
    140 
    141     def qsize(self):
    142         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
    143         return self._maxsize - self._sem._semlock._get_value()
    144 
    145     def empty(self):
    146         return not self._poll()
    147 
    148     def full(self):
    149         return self._sem._semlock._is_zero()
    150 
    151     def get_nowait(self):
    152         return self.get(False)
    153 
    154     def put_nowait(self, obj):
    155         return self.put(obj, False)
    156 
    157     def close(self):
    158         self._closed = True
    159         try:
    160             self._reader.close()
    161         finally:
    162             close = self._close
    163             if close:
    164                 self._close = None
    165                 close()
    166 
    167     def join_thread(self):
    168         debug('Queue.join_thread()')
    169         assert self._closed
    170         if self._jointhread:
    171             self._jointhread()
    172 
    173     def cancel_join_thread(self):
    174         debug('Queue.cancel_join_thread()')
    175         self._joincancelled = True
    176         try:
    177             self._jointhread.cancel()
    178         except AttributeError:
    179             pass
    180 
    181     def _start_thread(self):
    182         debug('Queue._start_thread()')
    183 
    184         # Start thread which transfers data from buffer to pipe
    185         self._buffer.clear()
    186         self._thread = threading.Thread(
    187             target=Queue._feed,
    188             args=(self._buffer, self._notempty, self._send,
    189                   self._wlock, self._writer.close),
    190             name='QueueFeederThread'
    191             )
    192         self._thread.daemon = True
    193 
    194         debug('doing self._thread.start()')
    195         self._thread.start()
    196         debug('... done self._thread.start()')
    197 
    198         # On process exit we will wait for data to be flushed to pipe.
    199         if not self._joincancelled:
    200             self._jointhread = Finalize(
    201                 self._thread, Queue._finalize_join,
    202                 [weakref.ref(self._thread)],
    203                 exitpriority=-5
    204                 )
    205 
    206         # Send sentinel to the thread queue object when garbage collected
    207         self._close = Finalize(
    208             self, Queue._finalize_close,
    209             [self._buffer, self._notempty],
    210             exitpriority=10
    211             )
    212 
    213     @staticmethod
    214     def _finalize_join(twr):
    215         debug('joining queue thread')
    216         thread = twr()
    217         if thread is not None:
    218             thread.join()
    219             debug('... queue thread joined')
    220         else:
    221             debug('... queue thread already dead')
    222 
    223     @staticmethod
    224     def _finalize_close(buffer, notempty):
    225         debug('telling queue thread to quit')
    226         notempty.acquire()
    227         try:
    228             buffer.append(_sentinel)
    229             notempty.notify()
    230         finally:
    231             notempty.release()
    232 
    233     @staticmethod
    234     def _feed(buffer, notempty, send, writelock, close):
    235         debug('starting thread to feed data to pipe')
    236         nacquire = notempty.acquire
    237         nrelease = notempty.release
    238         nwait = notempty.wait
    239         bpopleft = buffer.popleft
    240         sentinel = _sentinel
    241         if sys.platform != 'win32':
    242             wacquire = writelock.acquire
    243             wrelease = writelock.release
    244         else:
    245             wacquire = None
    246 
    247         try:
    248             while 1:
    249                 nacquire()
    250                 try:
    251                     if not buffer:
    252                         nwait()
    253                 finally:
    254                     nrelease()
    255                 try:
    256                     while 1:
    257                         obj = bpopleft()
    258                         if obj is sentinel:
    259                             debug('feeder thread got sentinel -- exiting')
    260                             close()
    261                             return
    262 
    263                         if wacquire is None:
    264                             send(obj)
    265                         else:
    266                             wacquire()
    267                             try:
    268                                 send(obj)
    269                             finally:
    270                                 wrelease()
    271                 except IndexError:
    272                     pass
    273         except Exception, e:
    274             # Since this runs in a daemon thread the resources it uses
    275             # may be become unusable while the process is cleaning up.
    276             # We ignore errors which happen after the process has
    277             # started to cleanup.
    278             try:
    279                 if is_exiting():
    280                     info('error in queue thread: %s', e)
    281                 else:
    282                     import traceback
    283                     traceback.print_exc()
    284             except Exception:
    285                 pass
    286 
    287 _sentinel = object()
    288 
    289 #
    290 # A queue type which also supports join() and task_done() methods
    291 #
    292 # Note that if you do not call task_done() for each finished task then
    293 # eventually the counter's semaphore may overflow causing Bad Things
    294 # to happen.
    295 #
    296 
    297 class JoinableQueue(Queue):
    298 
    299     def __init__(self, maxsize=0):
    300         Queue.__init__(self, maxsize)
    301         self._unfinished_tasks = Semaphore(0)
    302         self._cond = Condition()
    303 
    304     def __getstate__(self):
    305         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
    306 
    307     def __setstate__(self, state):
    308         Queue.__setstate__(self, state[:-2])
    309         self._cond, self._unfinished_tasks = state[-2:]
    310 
    311     def put(self, obj, block=True, timeout=None):
    312         assert not self._closed
    313         if not self._sem.acquire(block, timeout):
    314             raise Full
    315 
    316         self._notempty.acquire()
    317         self._cond.acquire()
    318         try:
    319             if self._thread is None:
    320                 self._start_thread()
    321             self._buffer.append(obj)
    322             self._unfinished_tasks.release()
    323             self._notempty.notify()
    324         finally:
    325             self._cond.release()
    326             self._notempty.release()
    327 
    328     def task_done(self):
    329         self._cond.acquire()
    330         try:
    331             if not self._unfinished_tasks.acquire(False):
    332                 raise ValueError('task_done() called too many times')
    333             if self._unfinished_tasks._semlock._is_zero():
    334                 self._cond.notify_all()
    335         finally:
    336             self._cond.release()
    337 
    338     def join(self):
    339         self._cond.acquire()
    340         try:
    341             if not self._unfinished_tasks._semlock._is_zero():
    342                 self._cond.wait()
    343         finally:
    344             self._cond.release()
    345 
    346 #
    347 # Simplified Queue type -- really just a locked pipe
    348 #
    349 
    350 class SimpleQueue(object):
    351 
    352     def __init__(self):
    353         self._reader, self._writer = Pipe(duplex=False)
    354         self._rlock = Lock()
    355         if sys.platform == 'win32':
    356             self._wlock = None
    357         else:
    358             self._wlock = Lock()
    359         self._make_methods()
    360 
    361     def empty(self):
    362         return not self._reader.poll()
    363 
    364     def __getstate__(self):
    365         assert_spawning(self)
    366         return (self._reader, self._writer, self._rlock, self._wlock)
    367 
    368     def __setstate__(self, state):
    369         (self._reader, self._writer, self._rlock, self._wlock) = state
    370         self._make_methods()
    371 
    372     def _make_methods(self):
    373         recv = self._reader.recv
    374         racquire, rrelease = self._rlock.acquire, self._rlock.release
    375         def get():
    376             racquire()
    377             try:
    378                 return recv()
    379             finally:
    380                 rrelease()
    381         self.get = get
    382 
    383         if self._wlock is None:
    384             # writes to a message oriented win32 pipe are atomic
    385             self.put = self._writer.send
    386         else:
    387             send = self._writer.send
    388             wacquire, wrelease = self._wlock.acquire, self._wlock.release
    389             def put(obj):
    390                 wacquire()
    391                 try:
    392                     return send(obj)
    393                 finally:
    394                     wrelease()
    395             self.put = put
    396