Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing various facilities to other parts of the package
      3 #
      4 # multiprocessing/util.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 import os
     36 import itertools
     37 import weakref
     38 import atexit
     39 import threading        # we want threading to install it's
     40                         # cleanup function before multiprocessing does
     41 from subprocess import _args_from_interpreter_flags
     42 
     43 from multiprocessing.process import current_process, active_children
     44 
     45 __all__ = [
     46     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
     47     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
     48     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
     49     'SUBDEBUG', 'SUBWARNING',
     50     ]
     51 
     52 #
     53 # Logging
     54 #
     55 
     56 NOTSET = 0
     57 SUBDEBUG = 5
     58 DEBUG = 10
     59 INFO = 20
     60 SUBWARNING = 25
     61 
     62 LOGGER_NAME = 'multiprocessing'
     63 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
     64 
     65 _logger = None
     66 _log_to_stderr = False
     67 
     68 def sub_debug(msg, *args):
     69     if _logger:
     70         _logger.log(SUBDEBUG, msg, *args)
     71 
     72 def debug(msg, *args):
     73     if _logger:
     74         _logger.log(DEBUG, msg, *args)
     75 
     76 def info(msg, *args):
     77     if _logger:
     78         _logger.log(INFO, msg, *args)
     79 
     80 def sub_warning(msg, *args):
     81     if _logger:
     82         _logger.log(SUBWARNING, msg, *args)
     83 
     84 def get_logger():
     85     '''
     86     Returns logger used by multiprocessing
     87     '''
     88     global _logger
     89     import logging, atexit
     90 
     91     logging._acquireLock()
     92     try:
     93         if not _logger:
     94 
     95             _logger = logging.getLogger(LOGGER_NAME)
     96             _logger.propagate = 0
     97             logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
     98             logging.addLevelName(SUBWARNING, 'SUBWARNING')
     99 
    100             # XXX multiprocessing should cleanup before logging
    101             if hasattr(atexit, 'unregister'):
    102                 atexit.unregister(_exit_function)
    103                 atexit.register(_exit_function)
    104             else:
    105                 atexit._exithandlers.remove((_exit_function, (), {}))
    106                 atexit._exithandlers.append((_exit_function, (), {}))
    107 
    108     finally:
    109         logging._releaseLock()
    110 
    111     return _logger
    112 
    113 def log_to_stderr(level=None):
    114     '''
    115     Turn on logging and add a handler which prints to stderr
    116     '''
    117     global _log_to_stderr
    118     import logging
    119 
    120     logger = get_logger()
    121     formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    122     handler = logging.StreamHandler()
    123     handler.setFormatter(formatter)
    124     logger.addHandler(handler)
    125 
    126     if level:
    127         logger.setLevel(level)
    128     _log_to_stderr = True
    129     return _logger
    130 
    131 #
    132 # Function returning a temp directory which will be removed on exit
    133 #
    134 
    135 def get_temp_dir():
    136     # get name of a temp directory which will be automatically cleaned up
    137     if current_process()._tempdir is None:
    138         import shutil, tempfile
    139         tempdir = tempfile.mkdtemp(prefix='pymp-')
    140         info('created temp directory %s', tempdir)
    141         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
    142         current_process()._tempdir = tempdir
    143     return current_process()._tempdir
    144 
    145 #
    146 # Support for reinitialization of objects when bootstrapping a child process
    147 #
    148 
    149 _afterfork_registry = weakref.WeakValueDictionary()
    150 _afterfork_counter = itertools.count()
    151 
    152 def _run_after_forkers():
    153     items = list(_afterfork_registry.items())
    154     items.sort()
    155     for (index, ident, func), obj in items:
    156         try:
    157             func(obj)
    158         except Exception, e:
    159             info('after forker raised exception %s', e)
    160 
    161 def register_after_fork(obj, func):
    162     _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
    163 
    164 #
    165 # Finalization using weakrefs
    166 #
    167 
    168 _finalizer_registry = {}
    169 _finalizer_counter = itertools.count()
    170 
    171 
    172 class Finalize(object):
    173     '''
    174     Class which supports object finalization using weakrefs
    175     '''
    176     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
    177         assert exitpriority is None or type(exitpriority) is int
    178 
    179         if obj is not None:
    180             self._weakref = weakref.ref(obj, self)
    181         else:
    182             assert exitpriority is not None
    183 
    184         self._callback = callback
    185         self._args = args
    186         self._kwargs = kwargs or {}
    187         self._key = (exitpriority, _finalizer_counter.next())
    188         self._pid = os.getpid()
    189 
    190         _finalizer_registry[self._key] = self
    191 
    192     def __call__(self, wr=None):
    193         '''
    194         Run the callback unless it has already been called or cancelled
    195         '''
    196         try:
    197             del _finalizer_registry[self._key]
    198         except KeyError:
    199             sub_debug('finalizer no longer registered')
    200         else:
    201             if self._pid != os.getpid():
    202                 sub_debug('finalizer ignored because different process')
    203                 res = None
    204             else:
    205                 sub_debug('finalizer calling %s with args %s and kwargs %s',
    206                           self._callback, self._args, self._kwargs)
    207                 res = self._callback(*self._args, **self._kwargs)
    208             self._weakref = self._callback = self._args = \
    209                             self._kwargs = self._key = None
    210             return res
    211 
    212     def cancel(self):
    213         '''
    214         Cancel finalization of the object
    215         '''
    216         try:
    217             del _finalizer_registry[self._key]
    218         except KeyError:
    219             pass
    220         else:
    221             self._weakref = self._callback = self._args = \
    222                             self._kwargs = self._key = None
    223 
    224     def still_active(self):
    225         '''
    226         Return whether this finalizer is still waiting to invoke callback
    227         '''
    228         return self._key in _finalizer_registry
    229 
    230     def __repr__(self):
    231         try:
    232             obj = self._weakref()
    233         except (AttributeError, TypeError):
    234             obj = None
    235 
    236         if obj is None:
    237             return '<Finalize object, dead>'
    238 
    239         x = '<Finalize object, callback=%s' % \
    240             getattr(self._callback, '__name__', self._callback)
    241         if self._args:
    242             x += ', args=' + str(self._args)
    243         if self._kwargs:
    244             x += ', kwargs=' + str(self._kwargs)
    245         if self._key[0] is not None:
    246             x += ', exitprority=' + str(self._key[0])
    247         return x + '>'
    248 
    249 
    250 def _run_finalizers(minpriority=None):
    251     '''
    252     Run all finalizers whose exit priority is not None and at least minpriority
    253 
    254     Finalizers with highest priority are called first; finalizers with
    255     the same priority will be called in reverse order of creation.
    256     '''
    257     if _finalizer_registry is None:
    258         # This function may be called after this module's globals are
    259         # destroyed.  See the _exit_function function in this module for more
    260         # notes.
    261         return
    262 
    263     if minpriority is None:
    264         f = lambda p : p[0][0] is not None
    265     else:
    266         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
    267 
    268     items = [x for x in _finalizer_registry.items() if f(x)]
    269     items.sort(reverse=True)
    270 
    271     for key, finalizer in items:
    272         sub_debug('calling %s', finalizer)
    273         try:
    274             finalizer()
    275         except Exception:
    276             import traceback
    277             traceback.print_exc()
    278 
    279     if minpriority is None:
    280         _finalizer_registry.clear()
    281 
    282 #
    283 # Clean up on exit
    284 #
    285 
    286 def is_exiting():
    287     '''
    288     Returns true if the process is shutting down
    289     '''
    290     return _exiting or _exiting is None
    291 
    292 _exiting = False
    293 
    294 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
    295                    active_children=active_children,
    296                    current_process=current_process):
    297     # NB: we hold on to references to functions in the arglist due to the
    298     # situation described below, where this function is called after this
    299     # module's globals are destroyed.
    300 
    301     global _exiting
    302 
    303     info('process shutting down')
    304     debug('running all "atexit" finalizers with priority >= 0')
    305     _run_finalizers(0)
    306 
    307     if current_process() is not None:
    308         # NB: we check if the current process is None here because if
    309         # it's None, any call to ``active_children()`` will throw an
    310         # AttributeError (active_children winds up trying to get
    311         # attributes from util._current_process).  This happens in a
    312         # variety of shutdown circumstances that are not well-understood
    313         # because module-scope variables are not apparently supposed to
    314         # be destroyed until after this function is called.  However,
    315         # they are indeed destroyed before this function is called.  See
    316         # issues 9775 and 15881.  Also related: 4106, 9205, and 9207.
    317 
    318         for p in active_children():
    319             if p._daemonic:
    320                 info('calling terminate() for daemon %s', p.name)
    321                 p._popen.terminate()
    322 
    323         for p in active_children():
    324             info('calling join() for process %s', p.name)
    325             p.join()
    326 
    327     debug('running the remaining "atexit" finalizers')
    328     _run_finalizers()
    329 
    330 atexit.register(_exit_function)
    331 
    332 #
    333 # Some fork aware types
    334 #
    335 
    336 class ForkAwareThreadLock(object):
    337     def __init__(self):
    338         self._reset()
    339         register_after_fork(self, ForkAwareThreadLock._reset)
    340 
    341     def _reset(self):
    342         self._lock = threading.Lock()
    343         self.acquire = self._lock.acquire
    344         self.release = self._lock.release
    345 
    346 class ForkAwareLocal(threading.local):
    347     def __init__(self):
    348         register_after_fork(self, lambda obj : obj.__dict__.clear())
    349     def __reduce__(self):
    350         return type(self), ()
    351