1 # 2 # Module implementing queues 3 # 4 # multiprocessing/queues.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 33 # 34 35 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 36 37 import sys 38 import os 39 import threading 40 import collections 41 import time 42 import atexit 43 import weakref 44 45 from Queue import Empty, Full 46 import _multiprocessing 47 from multiprocessing import Pipe 48 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition 49 from multiprocessing.util import debug, info, Finalize, register_after_fork 50 from multiprocessing.forking import assert_spawning 51 52 # 53 # Queue type using a pipe, buffer and thread 54 # 55 56 class Queue(object): 57 58 def __init__(self, maxsize=0): 59 if maxsize <= 0: 60 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 61 self._maxsize = maxsize 62 self._reader, self._writer = Pipe(duplex=False) 63 self._rlock = Lock() 64 self._opid = os.getpid() 65 if sys.platform == 'win32': 66 self._wlock = None 67 else: 68 self._wlock = Lock() 69 self._sem = BoundedSemaphore(maxsize) 70 71 self._after_fork() 72 73 if sys.platform != 'win32': 74 register_after_fork(self, Queue._after_fork) 75 76 def __getstate__(self): 77 assert_spawning(self) 78 return (self._maxsize, self._reader, self._writer, 79 self._rlock, self._wlock, self._sem, self._opid) 80 81 def __setstate__(self, state): 82 (self._maxsize, self._reader, self._writer, 83 self._rlock, self._wlock, self._sem, self._opid) = state 84 self._after_fork() 85 86 def _after_fork(self): 87 debug('Queue._after_fork()') 88 self._notempty = threading.Condition(threading.Lock()) 89 self._buffer = collections.deque() 90 self._thread = None 91 self._jointhread = None 92 self._joincancelled = False 93 self._closed = False 94 self._close = None 95 self._send = self._writer.send 96 self._recv = self._reader.recv 97 self._poll = self._reader.poll 98 99 def put(self, obj, block=True, timeout=None): 100 assert not self._closed 101 if not self._sem.acquire(block, timeout): 102 raise Full 103 104 self._notempty.acquire() 105 try: 106 if self._thread is None: 107 self._start_thread() 108 self._buffer.append(obj) 109 self._notempty.notify() 110 finally: 111 self._notempty.release() 112 113 def get(self, block=True, timeout=None): 114 if block and timeout is None: 115 self._rlock.acquire() 116 try: 117 res = self._recv() 118 self._sem.release() 119 return res 120 finally: 121 self._rlock.release() 122 123 else: 124 if block: 125 deadline = time.time() + timeout 126 if not self._rlock.acquire(block, timeout): 127 raise Empty 128 try: 129 if block: 130 timeout = deadline - time.time() 131 if timeout < 0 or not self._poll(timeout): 132 raise Empty 133 elif not self._poll(): 134 raise Empty 135 res = self._recv() 136 self._sem.release() 137 return res 138 finally: 139 self._rlock.release() 140 141 def qsize(self): 142 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 143 return self._maxsize - self._sem._semlock._get_value() 144 145 def empty(self): 146 return not self._poll() 147 148 def full(self): 149 return self._sem._semlock._is_zero() 150 151 def get_nowait(self): 152 return self.get(False) 153 154 def put_nowait(self, obj): 155 return self.put(obj, False) 156 157 def close(self): 158 self._closed = True 159 self._reader.close() 160 if self._close: 161 self._close() 162 163 def join_thread(self): 164 debug('Queue.join_thread()') 165 assert self._closed 166 if self._jointhread: 167 self._jointhread() 168 169 def cancel_join_thread(self): 170 debug('Queue.cancel_join_thread()') 171 self._joincancelled = True 172 try: 173 self._jointhread.cancel() 174 except AttributeError: 175 pass 176 177 def _start_thread(self): 178 debug('Queue._start_thread()') 179 180 # Start thread which transfers data from buffer to pipe 181 self._buffer.clear() 182 self._thread = threading.Thread( 183 target=Queue._feed, 184 args=(self._buffer, self._notempty, self._send, 185 self._wlock, self._writer.close), 186 name='QueueFeederThread' 187 ) 188 self._thread.daemon = True 189 190 debug('doing self._thread.start()') 191 self._thread.start() 192 debug('... done self._thread.start()') 193 194 # On process exit we will wait for data to be flushed to pipe. 195 if not self._joincancelled: 196 self._jointhread = Finalize( 197 self._thread, Queue._finalize_join, 198 [weakref.ref(self._thread)], 199 exitpriority=-5 200 ) 201 202 # Send sentinel to the thread queue object when garbage collected 203 self._close = Finalize( 204 self, Queue._finalize_close, 205 [self._buffer, self._notempty], 206 exitpriority=10 207 ) 208 209 @staticmethod 210 def _finalize_join(twr): 211 debug('joining queue thread') 212 thread = twr() 213 if thread is not None: 214 thread.join() 215 debug('... queue thread joined') 216 else: 217 debug('... queue thread already dead') 218 219 @staticmethod 220 def _finalize_close(buffer, notempty): 221 debug('telling queue thread to quit') 222 notempty.acquire() 223 try: 224 buffer.append(_sentinel) 225 notempty.notify() 226 finally: 227 notempty.release() 228 229 @staticmethod 230 def _feed(buffer, notempty, send, writelock, close): 231 debug('starting thread to feed data to pipe') 232 from .util import is_exiting 233 234 nacquire = notempty.acquire 235 nrelease = notempty.release 236 nwait = notempty.wait 237 bpopleft = buffer.popleft 238 sentinel = _sentinel 239 if sys.platform != 'win32': 240 wacquire = writelock.acquire 241 wrelease = writelock.release 242 else: 243 wacquire = None 244 245 try: 246 while 1: 247 nacquire() 248 try: 249 if not buffer: 250 nwait() 251 finally: 252 nrelease() 253 try: 254 while 1: 255 obj = bpopleft() 256 if obj is sentinel: 257 debug('feeder thread got sentinel -- exiting') 258 close() 259 return 260 261 if wacquire is None: 262 send(obj) 263 else: 264 wacquire() 265 try: 266 send(obj) 267 finally: 268 wrelease() 269 except IndexError: 270 pass 271 except Exception, e: 272 # Since this runs in a daemon thread the resources it uses 273 # may be become unusable while the process is cleaning up. 274 # We ignore errors which happen after the process has 275 # started to cleanup. 276 try: 277 if is_exiting(): 278 info('error in queue thread: %s', e) 279 else: 280 import traceback 281 traceback.print_exc() 282 except Exception: 283 pass 284 285 _sentinel = object() 286 287 # 288 # A queue type which also supports join() and task_done() methods 289 # 290 # Note that if you do not call task_done() for each finished task then 291 # eventually the counter's semaphore may overflow causing Bad Things 292 # to happen. 293 # 294 295 class JoinableQueue(Queue): 296 297 def __init__(self, maxsize=0): 298 Queue.__init__(self, maxsize) 299 self._unfinished_tasks = Semaphore(0) 300 self._cond = Condition() 301 302 def __getstate__(self): 303 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 304 305 def __setstate__(self, state): 306 Queue.__setstate__(self, state[:-2]) 307 self._cond, self._unfinished_tasks = state[-2:] 308 309 def put(self, obj, block=True, timeout=None): 310 assert not self._closed 311 if not self._sem.acquire(block, timeout): 312 raise Full 313 314 self._notempty.acquire() 315 self._cond.acquire() 316 try: 317 if self._thread is None: 318 self._start_thread() 319 self._buffer.append(obj) 320 self._unfinished_tasks.release() 321 self._notempty.notify() 322 finally: 323 self._cond.release() 324 self._notempty.release() 325 326 def task_done(self): 327 self._cond.acquire() 328 try: 329 if not self._unfinished_tasks.acquire(False): 330 raise ValueError('task_done() called too many times') 331 if self._unfinished_tasks._semlock._is_zero(): 332 self._cond.notify_all() 333 finally: 334 self._cond.release() 335 336 def join(self): 337 self._cond.acquire() 338 try: 339 if not self._unfinished_tasks._semlock._is_zero(): 340 self._cond.wait() 341 finally: 342 self._cond.release() 343 344 # 345 # Simplified Queue type -- really just a locked pipe 346 # 347 348 class SimpleQueue(object): 349 350 def __init__(self): 351 self._reader, self._writer = Pipe(duplex=False) 352 self._rlock = Lock() 353 if sys.platform == 'win32': 354 self._wlock = None 355 else: 356 self._wlock = Lock() 357 self._make_methods() 358 359 def empty(self): 360 return not self._reader.poll() 361 362 def __getstate__(self): 363 assert_spawning(self) 364 return (self._reader, self._writer, self._rlock, self._wlock) 365 366 def __setstate__(self, state): 367 (self._reader, self._writer, self._rlock, self._wlock) = state 368 self._make_methods() 369 370 def _make_methods(self): 371 recv = self._reader.recv 372 racquire, rrelease = self._rlock.acquire, self._rlock.release 373 def get(): 374 racquire() 375 try: 376 return recv() 377 finally: 378 rrelease() 379 self.get = get 380 381 if self._wlock is None: 382 # writes to a message oriented win32 pipe are atomic 383 self.put = self._writer.send 384 else: 385 send = self._writer.send 386 wacquire, wrelease = self._wlock.acquire, self._wlock.release 387 def put(obj): 388 wacquire() 389 try: 390 return send(obj) 391 finally: 392 wrelease() 393 self.put = put 394