1 # 2 # Module implementing queues 3 # 4 # multiprocessing/queues.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # Licensed to PSF under a Contributor Agreement. 8 # 9 10 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12 import sys 13 import os 14 import threading 15 import collections 16 import time 17 import weakref 18 import errno 19 20 from queue import Empty, Full 21 22 import _multiprocessing 23 24 from . import connection 25 from . import context 26 _ForkingPickler = context.reduction.ForkingPickler 27 28 from .util import debug, info, Finalize, register_after_fork, is_exiting 29 30 # 31 # Queue type using a pipe, buffer and thread 32 # 33 34 class Queue(object): 35 36 def __init__(self, maxsize=0, *, ctx): 37 if maxsize <= 0: 38 # Can raise ImportError (see issues #3770 and #23400) 39 from .synchronize import SEM_VALUE_MAX as maxsize 40 self._maxsize = maxsize 41 self._reader, self._writer = connection.Pipe(duplex=False) 42 self._rlock = ctx.Lock() 43 self._opid = os.getpid() 44 if sys.platform == 'win32': 45 self._wlock = None 46 else: 47 self._wlock = ctx.Lock() 48 self._sem = ctx.BoundedSemaphore(maxsize) 49 # For use by concurrent.futures 50 self._ignore_epipe = False 51 52 self._after_fork() 53 54 if sys.platform != 'win32': 55 register_after_fork(self, Queue._after_fork) 56 57 def __getstate__(self): 58 context.assert_spawning(self) 59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 60 self._rlock, self._wlock, self._sem, self._opid) 61 62 def __setstate__(self, state): 63 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 64 self._rlock, self._wlock, self._sem, self._opid) = state 65 self._after_fork() 66 67 def _after_fork(self): 68 debug('Queue._after_fork()') 69 self._notempty = threading.Condition(threading.Lock()) 70 self._buffer = collections.deque() 71 self._thread = None 72 self._jointhread = None 73 self._joincancelled = False 74 self._closed = False 75 self._close = None 76 self._send_bytes = self._writer.send_bytes 77 self._recv_bytes = self._reader.recv_bytes 78 self._poll = self._reader.poll 79 80 def put(self, obj, block=True, timeout=None): 81 assert not self._closed, "Queue {0!r} has been closed".format(self) 82 if not self._sem.acquire(block, timeout): 83 raise Full 84 85 with self._notempty: 86 if self._thread is None: 87 self._start_thread() 88 self._buffer.append(obj) 89 self._notempty.notify() 90 91 def get(self, block=True, timeout=None): 92 if block and timeout is None: 93 with self._rlock: 94 res = self._recv_bytes() 95 self._sem.release() 96 else: 97 if block: 98 deadline = time.monotonic() + timeout 99 if not self._rlock.acquire(block, timeout): 100 raise Empty 101 try: 102 if block: 103 timeout = deadline - time.monotonic() 104 if not self._poll(timeout): 105 raise Empty 106 elif not self._poll(): 107 raise Empty 108 res = self._recv_bytes() 109 self._sem.release() 110 finally: 111 self._rlock.release() 112 # unserialize the data after having released the lock 113 return _ForkingPickler.loads(res) 114 115 def qsize(self): 116 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 117 return self._maxsize - self._sem._semlock._get_value() 118 119 def empty(self): 120 return not self._poll() 121 122 def full(self): 123 return self._sem._semlock._is_zero() 124 125 def get_nowait(self): 126 return self.get(False) 127 128 def put_nowait(self, obj): 129 return self.put(obj, False) 130 131 def close(self): 132 self._closed = True 133 try: 134 self._reader.close() 135 finally: 136 close = self._close 137 if close: 138 self._close = None 139 close() 140 141 def join_thread(self): 142 debug('Queue.join_thread()') 143 assert self._closed, "Queue {0!r} not closed".format(self) 144 if self._jointhread: 145 self._jointhread() 146 147 def cancel_join_thread(self): 148 debug('Queue.cancel_join_thread()') 149 self._joincancelled = True 150 try: 151 self._jointhread.cancel() 152 except AttributeError: 153 pass 154 155 def _start_thread(self): 156 debug('Queue._start_thread()') 157 158 # Start thread which transfers data from buffer to pipe 159 self._buffer.clear() 160 self._thread = threading.Thread( 161 target=Queue._feed, 162 args=(self._buffer, self._notempty, self._send_bytes, 163 self._wlock, self._writer.close, self._ignore_epipe, 164 self._on_queue_feeder_error, self._sem), 165 name='QueueFeederThread' 166 ) 167 self._thread.daemon = True 168 169 debug('doing self._thread.start()') 170 self._thread.start() 171 debug('... done self._thread.start()') 172 173 if not self._joincancelled: 174 self._jointhread = Finalize( 175 self._thread, Queue._finalize_join, 176 [weakref.ref(self._thread)], 177 exitpriority=-5 178 ) 179 180 # Send sentinel to the thread queue object when garbage collected 181 self._close = Finalize( 182 self, Queue._finalize_close, 183 [self._buffer, self._notempty], 184 exitpriority=10 185 ) 186 187 @staticmethod 188 def _finalize_join(twr): 189 debug('joining queue thread') 190 thread = twr() 191 if thread is not None: 192 thread.join() 193 debug('... queue thread joined') 194 else: 195 debug('... queue thread already dead') 196 197 @staticmethod 198 def _finalize_close(buffer, notempty): 199 debug('telling queue thread to quit') 200 with notempty: 201 buffer.append(_sentinel) 202 notempty.notify() 203 204 @staticmethod 205 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, 206 onerror, queue_sem): 207 debug('starting thread to feed data to pipe') 208 nacquire = notempty.acquire 209 nrelease = notempty.release 210 nwait = notempty.wait 211 bpopleft = buffer.popleft 212 sentinel = _sentinel 213 if sys.platform != 'win32': 214 wacquire = writelock.acquire 215 wrelease = writelock.release 216 else: 217 wacquire = None 218 219 while 1: 220 try: 221 nacquire() 222 try: 223 if not buffer: 224 nwait() 225 finally: 226 nrelease() 227 try: 228 while 1: 229 obj = bpopleft() 230 if obj is sentinel: 231 debug('feeder thread got sentinel -- exiting') 232 close() 233 return 234 235 # serialize the data before acquiring the lock 236 obj = _ForkingPickler.dumps(obj) 237 if wacquire is None: 238 send_bytes(obj) 239 else: 240 wacquire() 241 try: 242 send_bytes(obj) 243 finally: 244 wrelease() 245 except IndexError: 246 pass 247 except Exception as e: 248 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 249 return 250 # Since this runs in a daemon thread the resources it uses 251 # may be become unusable while the process is cleaning up. 252 # We ignore errors which happen after the process has 253 # started to cleanup. 254 if is_exiting(): 255 info('error in queue thread: %s', e) 256 return 257 else: 258 # Since the object has not been sent in the queue, we need 259 # to decrease the size of the queue. The error acts as 260 # if the object had been silently removed from the queue 261 # and this step is necessary to have a properly working 262 # queue. 263 queue_sem.release() 264 onerror(e, obj) 265 266 @staticmethod 267 def _on_queue_feeder_error(e, obj): 268 """ 269 Private API hook called when feeding data in the background thread 270 raises an exception. For overriding by concurrent.futures. 271 """ 272 import traceback 273 traceback.print_exc() 274 275 276 _sentinel = object() 277 278 # 279 # A queue type which also supports join() and task_done() methods 280 # 281 # Note that if you do not call task_done() for each finished task then 282 # eventually the counter's semaphore may overflow causing Bad Things 283 # to happen. 284 # 285 286 class JoinableQueue(Queue): 287 288 def __init__(self, maxsize=0, *, ctx): 289 Queue.__init__(self, maxsize, ctx=ctx) 290 self._unfinished_tasks = ctx.Semaphore(0) 291 self._cond = ctx.Condition() 292 293 def __getstate__(self): 294 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 295 296 def __setstate__(self, state): 297 Queue.__setstate__(self, state[:-2]) 298 self._cond, self._unfinished_tasks = state[-2:] 299 300 def put(self, obj, block=True, timeout=None): 301 assert not self._closed, "Queue {0!r} is closed".format(self) 302 if not self._sem.acquire(block, timeout): 303 raise Full 304 305 with self._notempty, self._cond: 306 if self._thread is None: 307 self._start_thread() 308 self._buffer.append(obj) 309 self._unfinished_tasks.release() 310 self._notempty.notify() 311 312 def task_done(self): 313 with self._cond: 314 if not self._unfinished_tasks.acquire(False): 315 raise ValueError('task_done() called too many times') 316 if self._unfinished_tasks._semlock._is_zero(): 317 self._cond.notify_all() 318 319 def join(self): 320 with self._cond: 321 if not self._unfinished_tasks._semlock._is_zero(): 322 self._cond.wait() 323 324 # 325 # Simplified Queue type -- really just a locked pipe 326 # 327 328 class SimpleQueue(object): 329 330 def __init__(self, *, ctx): 331 self._reader, self._writer = connection.Pipe(duplex=False) 332 self._rlock = ctx.Lock() 333 self._poll = self._reader.poll 334 if sys.platform == 'win32': 335 self._wlock = None 336 else: 337 self._wlock = ctx.Lock() 338 339 def empty(self): 340 return not self._poll() 341 342 def __getstate__(self): 343 context.assert_spawning(self) 344 return (self._reader, self._writer, self._rlock, self._wlock) 345 346 def __setstate__(self, state): 347 (self._reader, self._writer, self._rlock, self._wlock) = state 348 self._poll = self._reader.poll 349 350 def get(self): 351 with self._rlock: 352 res = self._reader.recv_bytes() 353 # unserialize the data after having released the lock 354 return _ForkingPickler.loads(res) 355 356 def put(self, obj): 357 # serialize the data before acquiring the lock 358 obj = _ForkingPickler.dumps(obj) 359 if self._wlock is None: 360 # writes to a message oriented win32 pipe are atomic 361 self._writer.send_bytes(obj) 362 else: 363 with self._wlock: 364 self._writer.send_bytes(obj) 365