Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module for starting a process object using os.fork() or CreateProcess()
      3 #
      4 # multiprocessing/forking.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 import os
     36 import sys
     37 import signal
     38 import errno
     39 
     40 from multiprocessing import util, process
     41 
     42 __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
     43 
     44 #
     45 # Check that the current thread is spawning a child process
     46 #
     47 
     48 def assert_spawning(self):
     49     if not Popen.thread_is_spawning():
     50         raise RuntimeError(
     51             '%s objects should only be shared between processes'
     52             ' through inheritance' % type(self).__name__
     53             )
     54 
     55 #
     56 # Try making some callable types picklable
     57 #
     58 
     59 from pickle import Pickler
     60 class ForkingPickler(Pickler):
     61     dispatch = Pickler.dispatch.copy()
     62 
     63     @classmethod
     64     def register(cls, type, reduce):
     65         def dispatcher(self, obj):
     66             rv = reduce(obj)
     67             self.save_reduce(obj=obj, *rv)
     68         cls.dispatch[type] = dispatcher
     69 
     70 def _reduce_method(m):
     71     if m.im_self is None:
     72         return getattr, (m.im_class, m.im_func.func_name)
     73     else:
     74         return getattr, (m.im_self, m.im_func.func_name)
     75 ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
     76 
     77 def _reduce_method_descriptor(m):
     78     return getattr, (m.__objclass__, m.__name__)
     79 ForkingPickler.register(type(list.append), _reduce_method_descriptor)
     80 ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
     81 
     82 #def _reduce_builtin_function_or_method(m):
     83 #    return getattr, (m.__self__, m.__name__)
     84 #ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
     85 #ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
     86 
     87 try:
     88     from functools import partial
     89 except ImportError:
     90     pass
     91 else:
     92     def _reduce_partial(p):
     93         return _rebuild_partial, (p.func, p.args, p.keywords or {})
     94     def _rebuild_partial(func, args, keywords):
     95         return partial(func, *args, **keywords)
     96     ForkingPickler.register(partial, _reduce_partial)
     97 
     98 #
     99 # Unix
    100 #
    101 
    102 if sys.platform != 'win32':
    103     import time
    104 
    105     exit = os._exit
    106     duplicate = os.dup
    107     close = os.close
    108 
    109     #
    110     # We define a Popen class similar to the one from subprocess, but
    111     # whose constructor takes a process object as its argument.
    112     #
    113 
    114     class Popen(object):
    115 
    116         def __init__(self, process_obj):
    117             sys.stdout.flush()
    118             sys.stderr.flush()
    119             self.returncode = None
    120 
    121             self.pid = os.fork()
    122             if self.pid == 0:
    123                 if 'random' in sys.modules:
    124                     import random
    125                     random.seed()
    126                 code = process_obj._bootstrap()
    127                 sys.stdout.flush()
    128                 sys.stderr.flush()
    129                 os._exit(code)
    130 
    131         def poll(self, flag=os.WNOHANG):
    132             if self.returncode is None:
    133                 while True:
    134                     try:
    135                         pid, sts = os.waitpid(self.pid, flag)
    136                     except os.error as e:
    137                         if e.errno == errno.EINTR:
    138                             continue
    139                         # Child process not yet created. See #1731717
    140                         # e.errno == errno.ECHILD == 10
    141                         return None
    142                     else:
    143                         break
    144                 if pid == self.pid:
    145                     if os.WIFSIGNALED(sts):
    146                         self.returncode = -os.WTERMSIG(sts)
    147                     else:
    148                         assert os.WIFEXITED(sts)
    149                         self.returncode = os.WEXITSTATUS(sts)
    150             return self.returncode
    151 
    152         def wait(self, timeout=None):
    153             if timeout is None:
    154                 return self.poll(0)
    155             deadline = time.time() + timeout
    156             delay = 0.0005
    157             while 1:
    158                 res = self.poll()
    159                 if res is not None:
    160                     break
    161                 remaining = deadline - time.time()
    162                 if remaining <= 0:
    163                     break
    164                 delay = min(delay * 2, remaining, 0.05)
    165                 time.sleep(delay)
    166             return res
    167 
    168         def terminate(self):
    169             if self.returncode is None:
    170                 try:
    171                     os.kill(self.pid, signal.SIGTERM)
    172                 except OSError, e:
    173                     if self.wait(timeout=0.1) is None:
    174                         raise
    175 
    176         @staticmethod
    177         def thread_is_spawning():
    178             return False
    179 
    180 #
    181 # Windows
    182 #
    183 
    184 else:
    185     import thread
    186     import msvcrt
    187     import _subprocess
    188     import time
    189 
    190     from _multiprocessing import win32, Connection, PipeConnection
    191     from .util import Finalize
    192 
    193     #try:
    194     #    from cPickle import dump, load, HIGHEST_PROTOCOL
    195     #except ImportError:
    196     from pickle import load, HIGHEST_PROTOCOL
    197 
    198     def dump(obj, file, protocol=None):
    199         ForkingPickler(file, protocol).dump(obj)
    200 
    201     #
    202     #
    203     #
    204 
    205     TERMINATE = 0x10000
    206     WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
    207     WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
    208 
    209     exit = win32.ExitProcess
    210     close = win32.CloseHandle
    211 
    212     #
    213     # _python_exe is the assumed path to the python executable.
    214     # People embedding Python want to modify it.
    215     #
    216 
    217     if WINSERVICE:
    218         _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
    219     else:
    220         _python_exe = sys.executable
    221 
    222     def set_executable(exe):
    223         global _python_exe
    224         _python_exe = exe
    225 
    226     #
    227     #
    228     #
    229 
    230     def duplicate(handle, target_process=None, inheritable=False):
    231         if target_process is None:
    232             target_process = _subprocess.GetCurrentProcess()
    233         return _subprocess.DuplicateHandle(
    234             _subprocess.GetCurrentProcess(), handle, target_process,
    235             0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
    236             ).Detach()
    237 
    238     #
    239     # We define a Popen class similar to the one from subprocess, but
    240     # whose constructor takes a process object as its argument.
    241     #
    242 
    243     class Popen(object):
    244         '''
    245         Start a subprocess to run the code of a process object
    246         '''
    247         _tls = thread._local()
    248 
    249         def __init__(self, process_obj):
    250             # create pipe for communication with child
    251             rfd, wfd = os.pipe()
    252 
    253             # get handle for read end of the pipe and make it inheritable
    254             rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
    255             os.close(rfd)
    256 
    257             # start process
    258             cmd = get_command_line() + [rhandle]
    259             cmd = ' '.join('"%s"' % x for x in cmd)
    260             hp, ht, pid, tid = _subprocess.CreateProcess(
    261                 _python_exe, cmd, None, None, 1, 0, None, None, None
    262                 )
    263             ht.Close()
    264             close(rhandle)
    265 
    266             # set attributes of self
    267             self.pid = pid
    268             self.returncode = None
    269             self._handle = hp
    270 
    271             # send information to child
    272             prep_data = get_preparation_data(process_obj._name)
    273             to_child = os.fdopen(wfd, 'wb')
    274             Popen._tls.process_handle = int(hp)
    275             try:
    276                 dump(prep_data, to_child, HIGHEST_PROTOCOL)
    277                 dump(process_obj, to_child, HIGHEST_PROTOCOL)
    278             finally:
    279                 del Popen._tls.process_handle
    280                 to_child.close()
    281 
    282         @staticmethod
    283         def thread_is_spawning():
    284             return getattr(Popen._tls, 'process_handle', None) is not None
    285 
    286         @staticmethod
    287         def duplicate_for_child(handle):
    288             return duplicate(handle, Popen._tls.process_handle)
    289 
    290         def wait(self, timeout=None):
    291             if self.returncode is None:
    292                 if timeout is None:
    293                     msecs = _subprocess.INFINITE
    294                 else:
    295                     msecs = max(0, int(timeout * 1000 + 0.5))
    296 
    297                 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
    298                 if res == _subprocess.WAIT_OBJECT_0:
    299                     code = _subprocess.GetExitCodeProcess(self._handle)
    300                     if code == TERMINATE:
    301                         code = -signal.SIGTERM
    302                     self.returncode = code
    303 
    304             return self.returncode
    305 
    306         def poll(self):
    307             return self.wait(timeout=0)
    308 
    309         def terminate(self):
    310             if self.returncode is None:
    311                 try:
    312                     _subprocess.TerminateProcess(int(self._handle), TERMINATE)
    313                 except WindowsError:
    314                     if self.wait(timeout=0.1) is None:
    315                         raise
    316 
    317     #
    318     #
    319     #
    320 
    321     def is_forking(argv):
    322         '''
    323         Return whether commandline indicates we are forking
    324         '''
    325         if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
    326             assert len(argv) == 3
    327             return True
    328         else:
    329             return False
    330 
    331 
    332     def freeze_support():
    333         '''
    334         Run code for process object if this in not the main process
    335         '''
    336         if is_forking(sys.argv):
    337             main()
    338             sys.exit()
    339 
    340 
    341     def get_command_line():
    342         '''
    343         Returns prefix of command line used for spawning a child process
    344         '''
    345         if getattr(process.current_process(), '_inheriting', False):
    346             raise RuntimeError('''
    347             Attempt to start a new process before the current process
    348             has finished its bootstrapping phase.
    349 
    350             This probably means that you are on Windows and you have
    351             forgotten to use the proper idiom in the main module:
    352 
    353                 if __name__ == '__main__':
    354                     freeze_support()
    355                     ...
    356 
    357             The "freeze_support()" line can be omitted if the program
    358             is not going to be frozen to produce a Windows executable.''')
    359 
    360         if getattr(sys, 'frozen', False):
    361             return [sys.executable, '--multiprocessing-fork']
    362         else:
    363             prog = 'from multiprocessing.forking import main; main()'
    364             opts = util._args_from_interpreter_flags()
    365             return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
    366 
    367 
    368     def main():
    369         '''
    370         Run code specifed by data received over pipe
    371         '''
    372         assert is_forking(sys.argv)
    373 
    374         handle = int(sys.argv[-1])
    375         fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
    376         from_parent = os.fdopen(fd, 'rb')
    377 
    378         process.current_process()._inheriting = True
    379         preparation_data = load(from_parent)
    380         prepare(preparation_data)
    381         self = load(from_parent)
    382         process.current_process()._inheriting = False
    383 
    384         from_parent.close()
    385 
    386         exitcode = self._bootstrap()
    387         exit(exitcode)
    388 
    389 
    390     def get_preparation_data(name):
    391         '''
    392         Return info about parent needed by child to unpickle process object
    393         '''
    394         from .util import _logger, _log_to_stderr
    395 
    396         d = dict(
    397             name=name,
    398             sys_path=sys.path,
    399             sys_argv=sys.argv,
    400             log_to_stderr=_log_to_stderr,
    401             orig_dir=process.ORIGINAL_DIR,
    402             authkey=process.current_process().authkey,
    403             )
    404 
    405         if _logger is not None:
    406             d['log_level'] = _logger.getEffectiveLevel()
    407 
    408         if not WINEXE and not WINSERVICE:
    409             main_path = getattr(sys.modules['__main__'], '__file__', None)
    410             if not main_path and sys.argv[0] not in ('', '-c'):
    411                 main_path = sys.argv[0]
    412             if main_path is not None:
    413                 if not os.path.isabs(main_path) and \
    414                                           process.ORIGINAL_DIR is not None:
    415                     main_path = os.path.join(process.ORIGINAL_DIR, main_path)
    416                 d['main_path'] = os.path.normpath(main_path)
    417 
    418         return d
    419 
    420     #
    421     # Make (Pipe)Connection picklable
    422     #
    423 
    424     def reduce_connection(conn):
    425         if not Popen.thread_is_spawning():
    426             raise RuntimeError(
    427                 'By default %s objects can only be shared between processes\n'
    428                 'using inheritance' % type(conn).__name__
    429                 )
    430         return type(conn), (Popen.duplicate_for_child(conn.fileno()),
    431                             conn.readable, conn.writable)
    432 
    433     ForkingPickler.register(Connection, reduce_connection)
    434     ForkingPickler.register(PipeConnection, reduce_connection)
    435 
    436 #
    437 # Prepare current process
    438 #
    439 
    440 old_main_modules = []
    441 
    442 def prepare(data):
    443     '''
    444     Try to get current process ready to unpickle process object
    445     '''
    446     old_main_modules.append(sys.modules['__main__'])
    447 
    448     if 'name' in data:
    449         process.current_process().name = data['name']
    450 
    451     if 'authkey' in data:
    452         process.current_process()._authkey = data['authkey']
    453 
    454     if 'log_to_stderr' in data and data['log_to_stderr']:
    455         util.log_to_stderr()
    456 
    457     if 'log_level' in data:
    458         util.get_logger().setLevel(data['log_level'])
    459 
    460     if 'sys_path' in data:
    461         sys.path = data['sys_path']
    462 
    463     if 'sys_argv' in data:
    464         sys.argv = data['sys_argv']
    465 
    466     if 'dir' in data:
    467         os.chdir(data['dir'])
    468 
    469     if 'orig_dir' in data:
    470         process.ORIGINAL_DIR = data['orig_dir']
    471 
    472     if 'main_path' in data:
    473         main_path = data['main_path']
    474         main_name = os.path.splitext(os.path.basename(main_path))[0]
    475         if main_name == '__init__':
    476             main_name = os.path.basename(os.path.dirname(main_path))
    477 
    478         if main_name != 'ipython':
    479             import imp
    480 
    481             if main_path is None:
    482                 dirs = None
    483             elif os.path.basename(main_path).startswith('__init__.py'):
    484                 dirs = [os.path.dirname(os.path.dirname(main_path))]
    485             else:
    486                 dirs = [os.path.dirname(main_path)]
    487 
    488             assert main_name not in sys.modules, main_name
    489             file, path_name, etc = imp.find_module(main_name, dirs)
    490             try:
    491                 # We would like to do "imp.load_module('__main__', ...)"
    492                 # here.  However, that would cause 'if __name__ ==
    493                 # "__main__"' clauses to be executed.
    494                 main_module = imp.load_module(
    495                     '__parents_main__', file, path_name, etc
    496                     )
    497             finally:
    498                 if file:
    499                     file.close()
    500 
    501             sys.modules['__main__'] = main_module
    502             main_module.__name__ = '__main__'
    503 
    504             # Try to make the potentially picklable objects in
    505             # sys.modules['__main__'] realize they are in the main
    506             # module -- somewhat ugly.
    507             for obj in main_module.__dict__.values():
    508                 try:
    509                     if obj.__module__ == '__parents_main__':
    510                         obj.__module__ = '__main__'
    511                 except Exception:
    512                     pass
    513