1 # 2 # Module providing various facilities to other parts of the package 3 # 4 # multiprocessing/util.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 33 # 34 35 import itertools 36 import weakref 37 import atexit 38 import threading # we want threading to install it's 39 # cleanup function before multiprocessing does 40 from subprocess import _args_from_interpreter_flags 41 42 from multiprocessing.process import current_process, active_children 43 44 __all__ = [ 45 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 46 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 47 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 48 'SUBDEBUG', 'SUBWARNING', 49 ] 50 51 # 52 # Logging 53 # 54 55 NOTSET = 0 56 SUBDEBUG = 5 57 DEBUG = 10 58 INFO = 20 59 SUBWARNING = 25 60 61 LOGGER_NAME = 'multiprocessing' 62 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 63 64 _logger = None 65 _log_to_stderr = False 66 67 def sub_debug(msg, *args): 68 if _logger: 69 _logger.log(SUBDEBUG, msg, *args) 70 71 def debug(msg, *args): 72 if _logger: 73 _logger.log(DEBUG, msg, *args) 74 75 def info(msg, *args): 76 if _logger: 77 _logger.log(INFO, msg, *args) 78 79 def sub_warning(msg, *args): 80 if _logger: 81 _logger.log(SUBWARNING, msg, *args) 82 83 def get_logger(): 84 ''' 85 Returns logger used by multiprocessing 86 ''' 87 global _logger 88 import logging, atexit 89 90 logging._acquireLock() 91 try: 92 if not _logger: 93 94 _logger = logging.getLogger(LOGGER_NAME) 95 _logger.propagate = 0 96 logging.addLevelName(SUBDEBUG, 'SUBDEBUG') 97 logging.addLevelName(SUBWARNING, 'SUBWARNING') 98 99 # XXX multiprocessing should cleanup before logging 100 if hasattr(atexit, 'unregister'): 101 atexit.unregister(_exit_function) 102 atexit.register(_exit_function) 103 else: 104 atexit._exithandlers.remove((_exit_function, (), {})) 105 atexit._exithandlers.append((_exit_function, (), {})) 106 107 finally: 108 logging._releaseLock() 109 110 return _logger 111 112 def log_to_stderr(level=None): 113 ''' 114 Turn on logging and add a handler which prints to stderr 115 ''' 116 global _log_to_stderr 117 import logging 118 119 logger = get_logger() 120 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 121 handler = logging.StreamHandler() 122 handler.setFormatter(formatter) 123 logger.addHandler(handler) 124 125 if level: 126 logger.setLevel(level) 127 _log_to_stderr = True 128 return _logger 129 130 # 131 # Function returning a temp directory which will be removed on exit 132 # 133 134 def get_temp_dir(): 135 # get name of a temp directory which will be automatically cleaned up 136 if current_process()._tempdir is None: 137 import shutil, tempfile 138 tempdir = tempfile.mkdtemp(prefix='pymp-') 139 info('created temp directory %s', tempdir) 140 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) 141 current_process()._tempdir = tempdir 142 return current_process()._tempdir 143 144 # 145 # Support for reinitialization of objects when bootstrapping a child process 146 # 147 148 _afterfork_registry = weakref.WeakValueDictionary() 149 _afterfork_counter = itertools.count() 150 151 def _run_after_forkers(): 152 items = list(_afterfork_registry.items()) 153 items.sort() 154 for (index, ident, func), obj in items: 155 try: 156 func(obj) 157 except Exception, e: 158 info('after forker raised exception %s', e) 159 160 def register_after_fork(obj, func): 161 _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj 162 163 # 164 # Finalization using weakrefs 165 # 166 167 _finalizer_registry = {} 168 _finalizer_counter = itertools.count() 169 170 171 class Finalize(object): 172 ''' 173 Class which supports object finalization using weakrefs 174 ''' 175 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 176 assert exitpriority is None or type(exitpriority) is int 177 178 if obj is not None: 179 self._weakref = weakref.ref(obj, self) 180 else: 181 assert exitpriority is not None 182 183 self._callback = callback 184 self._args = args 185 self._kwargs = kwargs or {} 186 self._key = (exitpriority, _finalizer_counter.next()) 187 188 _finalizer_registry[self._key] = self 189 190 def __call__(self, wr=None): 191 ''' 192 Run the callback unless it has already been called or cancelled 193 ''' 194 try: 195 del _finalizer_registry[self._key] 196 except KeyError: 197 sub_debug('finalizer no longer registered') 198 else: 199 sub_debug('finalizer calling %s with args %s and kwargs %s', 200 self._callback, self._args, self._kwargs) 201 res = self._callback(*self._args, **self._kwargs) 202 self._weakref = self._callback = self._args = \ 203 self._kwargs = self._key = None 204 return res 205 206 def cancel(self): 207 ''' 208 Cancel finalization of the object 209 ''' 210 try: 211 del _finalizer_registry[self._key] 212 except KeyError: 213 pass 214 else: 215 self._weakref = self._callback = self._args = \ 216 self._kwargs = self._key = None 217 218 def still_active(self): 219 ''' 220 Return whether this finalizer is still waiting to invoke callback 221 ''' 222 return self._key in _finalizer_registry 223 224 def __repr__(self): 225 try: 226 obj = self._weakref() 227 except (AttributeError, TypeError): 228 obj = None 229 230 if obj is None: 231 return '<Finalize object, dead>' 232 233 x = '<Finalize object, callback=%s' % \ 234 getattr(self._callback, '__name__', self._callback) 235 if self._args: 236 x += ', args=' + str(self._args) 237 if self._kwargs: 238 x += ', kwargs=' + str(self._kwargs) 239 if self._key[0] is not None: 240 x += ', exitprority=' + str(self._key[0]) 241 return x + '>' 242 243 244 def _run_finalizers(minpriority=None): 245 ''' 246 Run all finalizers whose exit priority is not None and at least minpriority 247 248 Finalizers with highest priority are called first; finalizers with 249 the same priority will be called in reverse order of creation. 250 ''' 251 if _finalizer_registry is None: 252 # This function may be called after this module's globals are 253 # destroyed. See the _exit_function function in this module for more 254 # notes. 255 return 256 257 if minpriority is None: 258 f = lambda p : p[0][0] is not None 259 else: 260 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority 261 262 items = [x for x in _finalizer_registry.items() if f(x)] 263 items.sort(reverse=True) 264 265 for key, finalizer in items: 266 sub_debug('calling %s', finalizer) 267 try: 268 finalizer() 269 except Exception: 270 import traceback 271 traceback.print_exc() 272 273 if minpriority is None: 274 _finalizer_registry.clear() 275 276 # 277 # Clean up on exit 278 # 279 280 def is_exiting(): 281 ''' 282 Returns true if the process is shutting down 283 ''' 284 return _exiting or _exiting is None 285 286 _exiting = False 287 288 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 289 active_children=active_children, 290 current_process=current_process): 291 # NB: we hold on to references to functions in the arglist due to the 292 # situation described below, where this function is called after this 293 # module's globals are destroyed. 294 295 global _exiting 296 297 info('process shutting down') 298 debug('running all "atexit" finalizers with priority >= 0') 299 _run_finalizers(0) 300 301 if current_process() is not None: 302 # NB: we check if the current process is None here because if 303 # it's None, any call to ``active_children()`` will throw an 304 # AttributeError (active_children winds up trying to get 305 # attributes from util._current_process). This happens in a 306 # variety of shutdown circumstances that are not well-understood 307 # because module-scope variables are not apparently supposed to 308 # be destroyed until after this function is called. However, 309 # they are indeed destroyed before this function is called. See 310 # issues 9775 and 15881. Also related: 4106, 9205, and 9207. 311 312 for p in active_children(): 313 if p._daemonic: 314 info('calling terminate() for daemon %s', p.name) 315 p._popen.terminate() 316 317 for p in active_children(): 318 info('calling join() for process %s', p.name) 319 p.join() 320 321 debug('running the remaining "atexit" finalizers') 322 _run_finalizers() 323 324 atexit.register(_exit_function) 325 326 # 327 # Some fork aware types 328 # 329 330 class ForkAwareThreadLock(object): 331 def __init__(self): 332 self._reset() 333 register_after_fork(self, ForkAwareThreadLock._reset) 334 335 def _reset(self): 336 self._lock = threading.Lock() 337 self.acquire = self._lock.acquire 338 self.release = self._lock.release 339 340 class ForkAwareLocal(threading.local): 341 def __init__(self): 342 register_after_fork(self, lambda obj : obj.__dict__.clear()) 343 def __reduce__(self): 344 return type(self), () 345