Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing various facilities to other parts of the package
      3 #
      4 # multiprocessing/util.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 import itertools
     36 import weakref
     37 import atexit
     38 import threading        # we want threading to install it's
     39                         # cleanup function before multiprocessing does
     40 from subprocess import _args_from_interpreter_flags
     41 
     42 from multiprocessing.process import current_process, active_children
     43 
     44 __all__ = [
     45     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
     46     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
     47     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
     48     'SUBDEBUG', 'SUBWARNING',
     49     ]
     50 
     51 #
     52 # Logging
     53 #
     54 
     55 NOTSET = 0
     56 SUBDEBUG = 5
     57 DEBUG = 10
     58 INFO = 20
     59 SUBWARNING = 25
     60 
     61 LOGGER_NAME = 'multiprocessing'
     62 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
     63 
     64 _logger = None
     65 _log_to_stderr = False
     66 
     67 def sub_debug(msg, *args):
     68     if _logger:
     69         _logger.log(SUBDEBUG, msg, *args)
     70 
     71 def debug(msg, *args):
     72     if _logger:
     73         _logger.log(DEBUG, msg, *args)
     74 
     75 def info(msg, *args):
     76     if _logger:
     77         _logger.log(INFO, msg, *args)
     78 
     79 def sub_warning(msg, *args):
     80     if _logger:
     81         _logger.log(SUBWARNING, msg, *args)
     82 
     83 def get_logger():
     84     '''
     85     Returns logger used by multiprocessing
     86     '''
     87     global _logger
     88     import logging, atexit
     89 
     90     logging._acquireLock()
     91     try:
     92         if not _logger:
     93 
     94             _logger = logging.getLogger(LOGGER_NAME)
     95             _logger.propagate = 0
     96             logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
     97             logging.addLevelName(SUBWARNING, 'SUBWARNING')
     98 
     99             # XXX multiprocessing should cleanup before logging
    100             if hasattr(atexit, 'unregister'):
    101                 atexit.unregister(_exit_function)
    102                 atexit.register(_exit_function)
    103             else:
    104                 atexit._exithandlers.remove((_exit_function, (), {}))
    105                 atexit._exithandlers.append((_exit_function, (), {}))
    106 
    107     finally:
    108         logging._releaseLock()
    109 
    110     return _logger
    111 
    112 def log_to_stderr(level=None):
    113     '''
    114     Turn on logging and add a handler which prints to stderr
    115     '''
    116     global _log_to_stderr
    117     import logging
    118 
    119     logger = get_logger()
    120     formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    121     handler = logging.StreamHandler()
    122     handler.setFormatter(formatter)
    123     logger.addHandler(handler)
    124 
    125     if level:
    126         logger.setLevel(level)
    127     _log_to_stderr = True
    128     return _logger
    129 
    130 #
    131 # Function returning a temp directory which will be removed on exit
    132 #
    133 
    134 def get_temp_dir():
    135     # get name of a temp directory which will be automatically cleaned up
    136     if current_process()._tempdir is None:
    137         import shutil, tempfile
    138         tempdir = tempfile.mkdtemp(prefix='pymp-')
    139         info('created temp directory %s', tempdir)
    140         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
    141         current_process()._tempdir = tempdir
    142     return current_process()._tempdir
    143 
    144 #
    145 # Support for reinitialization of objects when bootstrapping a child process
    146 #
    147 
    148 _afterfork_registry = weakref.WeakValueDictionary()
    149 _afterfork_counter = itertools.count()
    150 
    151 def _run_after_forkers():
    152     items = list(_afterfork_registry.items())
    153     items.sort()
    154     for (index, ident, func), obj in items:
    155         try:
    156             func(obj)
    157         except Exception, e:
    158             info('after forker raised exception %s', e)
    159 
    160 def register_after_fork(obj, func):
    161     _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
    162 
    163 #
    164 # Finalization using weakrefs
    165 #
    166 
    167 _finalizer_registry = {}
    168 _finalizer_counter = itertools.count()
    169 
    170 
    171 class Finalize(object):
    172     '''
    173     Class which supports object finalization using weakrefs
    174     '''
    175     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
    176         assert exitpriority is None or type(exitpriority) is int
    177 
    178         if obj is not None:
    179             self._weakref = weakref.ref(obj, self)
    180         else:
    181             assert exitpriority is not None
    182 
    183         self._callback = callback
    184         self._args = args
    185         self._kwargs = kwargs or {}
    186         self._key = (exitpriority, _finalizer_counter.next())
    187 
    188         _finalizer_registry[self._key] = self
    189 
    190     def __call__(self, wr=None):
    191         '''
    192         Run the callback unless it has already been called or cancelled
    193         '''
    194         try:
    195             del _finalizer_registry[self._key]
    196         except KeyError:
    197             sub_debug('finalizer no longer registered')
    198         else:
    199             sub_debug('finalizer calling %s with args %s and kwargs %s',
    200                      self._callback, self._args, self._kwargs)
    201             res = self._callback(*self._args, **self._kwargs)
    202             self._weakref = self._callback = self._args = \
    203                             self._kwargs = self._key = None
    204             return res
    205 
    206     def cancel(self):
    207         '''
    208         Cancel finalization of the object
    209         '''
    210         try:
    211             del _finalizer_registry[self._key]
    212         except KeyError:
    213             pass
    214         else:
    215             self._weakref = self._callback = self._args = \
    216                             self._kwargs = self._key = None
    217 
    218     def still_active(self):
    219         '''
    220         Return whether this finalizer is still waiting to invoke callback
    221         '''
    222         return self._key in _finalizer_registry
    223 
    224     def __repr__(self):
    225         try:
    226             obj = self._weakref()
    227         except (AttributeError, TypeError):
    228             obj = None
    229 
    230         if obj is None:
    231             return '<Finalize object, dead>'
    232 
    233         x = '<Finalize object, callback=%s' % \
    234             getattr(self._callback, '__name__', self._callback)
    235         if self._args:
    236             x += ', args=' + str(self._args)
    237         if self._kwargs:
    238             x += ', kwargs=' + str(self._kwargs)
    239         if self._key[0] is not None:
    240             x += ', exitprority=' + str(self._key[0])
    241         return x + '>'
    242 
    243 
    244 def _run_finalizers(minpriority=None):
    245     '''
    246     Run all finalizers whose exit priority is not None and at least minpriority
    247 
    248     Finalizers with highest priority are called first; finalizers with
    249     the same priority will be called in reverse order of creation.
    250     '''
    251     if _finalizer_registry is None:
    252         # This function may be called after this module's globals are
    253         # destroyed.  See the _exit_function function in this module for more
    254         # notes.
    255         return
    256 
    257     if minpriority is None:
    258         f = lambda p : p[0][0] is not None
    259     else:
    260         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
    261 
    262     items = [x for x in _finalizer_registry.items() if f(x)]
    263     items.sort(reverse=True)
    264 
    265     for key, finalizer in items:
    266         sub_debug('calling %s', finalizer)
    267         try:
    268             finalizer()
    269         except Exception:
    270             import traceback
    271             traceback.print_exc()
    272 
    273     if minpriority is None:
    274         _finalizer_registry.clear()
    275 
    276 #
    277 # Clean up on exit
    278 #
    279 
    280 def is_exiting():
    281     '''
    282     Returns true if the process is shutting down
    283     '''
    284     return _exiting or _exiting is None
    285 
    286 _exiting = False
    287 
    288 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
    289                    active_children=active_children,
    290                    current_process=current_process):
    291     # NB: we hold on to references to functions in the arglist due to the
    292     # situation described below, where this function is called after this
    293     # module's globals are destroyed.
    294 
    295     global _exiting
    296 
    297     info('process shutting down')
    298     debug('running all "atexit" finalizers with priority >= 0')
    299     _run_finalizers(0)
    300 
    301     if current_process() is not None:
    302         # NB: we check if the current process is None here because if
    303         # it's None, any call to ``active_children()`` will throw an
    304         # AttributeError (active_children winds up trying to get
    305         # attributes from util._current_process).  This happens in a
    306         # variety of shutdown circumstances that are not well-understood
    307         # because module-scope variables are not apparently supposed to
    308         # be destroyed until after this function is called.  However,
    309         # they are indeed destroyed before this function is called.  See
    310         # issues 9775 and 15881.  Also related: 4106, 9205, and 9207.
    311 
    312         for p in active_children():
    313             if p._daemonic:
    314                 info('calling terminate() for daemon %s', p.name)
    315                 p._popen.terminate()
    316 
    317         for p in active_children():
    318             info('calling join() for process %s', p.name)
    319             p.join()
    320 
    321     debug('running the remaining "atexit" finalizers')
    322     _run_finalizers()
    323 
    324 atexit.register(_exit_function)
    325 
    326 #
    327 # Some fork aware types
    328 #
    329 
    330 class ForkAwareThreadLock(object):
    331     def __init__(self):
    332         self._reset()
    333         register_after_fork(self, ForkAwareThreadLock._reset)
    334 
    335     def _reset(self):
    336         self._lock = threading.Lock()
    337         self.acquire = self._lock.acquire
    338         self.release = self._lock.release
    339 
    340 class ForkAwareLocal(threading.local):
    341     def __init__(self):
    342         register_after_fork(self, lambda obj : obj.__dict__.clear())
    343     def __reduce__(self):
    344         return type(self), ()
    345