Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing the `Process` class which emulates `threading.Thread`
      3 #
      4 # multiprocessing/process.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # Licensed to PSF under a Contributor Agreement.
      8 #
      9 
     10 __all__ = ['BaseProcess', 'current_process', 'active_children']
     11 
     12 #
     13 # Imports
     14 #
     15 
     16 import os
     17 import sys
     18 import signal
     19 import itertools
     20 import threading
     21 from _weakrefset import WeakSet
     22 
     23 #
     24 #
     25 #
     26 
     27 try:
     28     ORIGINAL_DIR = os.path.abspath(os.getcwd())
     29 except OSError:
     30     ORIGINAL_DIR = None
     31 
     32 #
     33 # Public functions
     34 #
     35 
     36 def current_process():
     37     '''
     38     Return process object representing the current process
     39     '''
     40     return _current_process
     41 
     42 def active_children():
     43     '''
     44     Return list of process objects corresponding to live child processes
     45     '''
     46     _cleanup()
     47     return list(_children)
     48 
     49 #
     50 #
     51 #
     52 
     53 def _cleanup():
     54     # check for processes which have finished
     55     for p in list(_children):
     56         if p._popen.poll() is not None:
     57             _children.discard(p)
     58 
     59 #
     60 # The `Process` class
     61 #
     62 
     63 class BaseProcess(object):
     64     '''
     65     Process objects represent activity that is run in a separate process
     66 
     67     The class is analogous to `threading.Thread`
     68     '''
     69     def _Popen(self):
     70         raise NotImplementedError
     71 
     72     def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
     73                  *, daemon=None):
     74         assert group is None, 'group argument must be None for now'
     75         count = next(_process_counter)
     76         self._identity = _current_process._identity + (count,)
     77         self._config = _current_process._config.copy()
     78         self._parent_pid = os.getpid()
     79         self._popen = None
     80         self._closed = False
     81         self._target = target
     82         self._args = tuple(args)
     83         self._kwargs = dict(kwargs)
     84         self._name = name or type(self).__name__ + '-' + \
     85                      ':'.join(str(i) for i in self._identity)
     86         if daemon is not None:
     87             self.daemon = daemon
     88         _dangling.add(self)
     89 
     90     def _check_closed(self):
     91         if self._closed:
     92             raise ValueError("process object is closed")
     93 
     94     def run(self):
     95         '''
     96         Method to be run in sub-process; can be overridden in sub-class
     97         '''
     98         if self._target:
     99             self._target(*self._args, **self._kwargs)
    100 
    101     def start(self):
    102         '''
    103         Start child process
    104         '''
    105         self._check_closed()
    106         assert self._popen is None, 'cannot start a process twice'
    107         assert self._parent_pid == os.getpid(), \
    108                'can only start a process object created by current process'
    109         assert not _current_process._config.get('daemon'), \
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
    112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect
    115         # reference to the process object (see bpo-30775)
    116         del self._target, self._args, self._kwargs
    117         _children.add(self)
    118 
    119     def terminate(self):
    120         '''
    121         Terminate process; sends SIGTERM signal or uses TerminateProcess()
    122         '''
    123         self._check_closed()
    124         self._popen.terminate()
    125 
    126     def kill(self):
    127         '''
    128         Terminate process; sends SIGKILL signal or uses TerminateProcess()
    129         '''
    130         self._check_closed()
    131         self._popen.kill()
    132 
    133     def join(self, timeout=None):
    134         '''
    135         Wait until child process terminates
    136         '''
    137         self._check_closed()
    138         assert self._parent_pid == os.getpid(), 'can only join a child process'
    139         assert self._popen is not None, 'can only join a started process'
    140         res = self._popen.wait(timeout)
    141         if res is not None:
    142             _children.discard(self)
    143 
    144     def is_alive(self):
    145         '''
    146         Return whether process is alive
    147         '''
    148         self._check_closed()
    149         if self is _current_process:
    150             return True
    151         assert self._parent_pid == os.getpid(), 'can only test a child process'
    152 
    153         if self._popen is None:
    154             return False
    155 
    156         returncode = self._popen.poll()
    157         if returncode is None:
    158             return True
    159         else:
    160             _children.discard(self)
    161             return False
    162 
    163     def close(self):
    164         '''
    165         Close the Process object.
    166 
    167         This method releases resources held by the Process object.  It is
    168         an error to call this method if the child process is still running.
    169         '''
    170         if self._popen is not None:
    171             if self._popen.poll() is None:
    172                 raise ValueError("Cannot close a process while it is still running. "
    173                                  "You should first call join() or terminate().")
    174             self._popen.close()
    175             self._popen = None
    176             del self._sentinel
    177             _children.discard(self)
    178         self._closed = True
    179 
    180     @property
    181     def name(self):
    182         return self._name
    183 
    184     @name.setter
    185     def name(self, name):
    186         assert isinstance(name, str), 'name must be a string'
    187         self._name = name
    188 
    189     @property
    190     def daemon(self):
    191         '''
    192         Return whether process is a daemon
    193         '''
    194         return self._config.get('daemon', False)
    195 
    196     @daemon.setter
    197     def daemon(self, daemonic):
    198         '''
    199         Set whether process is a daemon
    200         '''
    201         assert self._popen is None, 'process has already started'
    202         self._config['daemon'] = daemonic
    203 
    204     @property
    205     def authkey(self):
    206         return self._config['authkey']
    207 
    208     @authkey.setter
    209     def authkey(self, authkey):
    210         '''
    211         Set authorization key of process
    212         '''
    213         self._config['authkey'] = AuthenticationString(authkey)
    214 
    215     @property
    216     def exitcode(self):
    217         '''
    218         Return exit code of process or `None` if it has yet to stop
    219         '''
    220         self._check_closed()
    221         if self._popen is None:
    222             return self._popen
    223         return self._popen.poll()
    224 
    225     @property
    226     def ident(self):
    227         '''
    228         Return identifier (PID) of process or `None` if it has yet to start
    229         '''
    230         self._check_closed()
    231         if self is _current_process:
    232             return os.getpid()
    233         else:
    234             return self._popen and self._popen.pid
    235 
    236     pid = ident
    237 
    238     @property
    239     def sentinel(self):
    240         '''
    241         Return a file descriptor (Unix) or handle (Windows) suitable for
    242         waiting for process termination.
    243         '''
    244         self._check_closed()
    245         try:
    246             return self._sentinel
    247         except AttributeError:
    248             raise ValueError("process not started") from None
    249 
    250     def __repr__(self):
    251         if self is _current_process:
    252             status = 'started'
    253         elif self._closed:
    254             status = 'closed'
    255         elif self._parent_pid != os.getpid():
    256             status = 'unknown'
    257         elif self._popen is None:
    258             status = 'initial'
    259         else:
    260             if self._popen.poll() is not None:
    261                 status = self.exitcode
    262             else:
    263                 status = 'started'
    264 
    265         if type(status) is int:
    266             if status == 0:
    267                 status = 'stopped'
    268             else:
    269                 status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
    270 
    271         return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
    272                                    status, self.daemon and ' daemon' or '')
    273 
    274     ##
    275 
    276     def _bootstrap(self):
    277         from . import util, context
    278         global _current_process, _process_counter, _children
    279 
    280         try:
    281             if self._start_method is not None:
    282                 context._force_start_method(self._start_method)
    283             _process_counter = itertools.count(1)
    284             _children = set()
    285             util._close_stdin()
    286             old_process = _current_process
    287             _current_process = self
    288             try:
    289                 util._finalizer_registry.clear()
    290                 util._run_after_forkers()
    291             finally:
    292                 # delay finalization of the old process object until after
    293                 # _run_after_forkers() is executed
    294                 del old_process
    295             util.info('child process calling self.run()')
    296             try:
    297                 self.run()
    298                 exitcode = 0
    299             finally:
    300                 util._exit_function()
    301         except SystemExit as e:
    302             if not e.args:
    303                 exitcode = 1
    304             elif isinstance(e.args[0], int):
    305                 exitcode = e.args[0]
    306             else:
    307                 sys.stderr.write(str(e.args[0]) + '\n')
    308                 exitcode = 1
    309         except:
    310             exitcode = 1
    311             import traceback
    312             sys.stderr.write('Process %s:\n' % self.name)
    313             traceback.print_exc()
    314         finally:
    315             threading._shutdown()
    316             util.info('process exiting with exitcode %d' % exitcode)
    317             util._flush_std_streams()
    318 
    319         return exitcode
    320 
    321 #
    322 # We subclass bytes to avoid accidental transmission of auth keys over network
    323 #
    324 
    325 class AuthenticationString(bytes):
    326     def __reduce__(self):
    327         from .context import get_spawning_popen
    328         if get_spawning_popen() is None:
    329             raise TypeError(
    330                 'Pickling an AuthenticationString object is '
    331                 'disallowed for security reasons'
    332                 )
    333         return AuthenticationString, (bytes(self),)
    334 
    335 #
    336 # Create object representing the main process
    337 #
    338 
    339 class _MainProcess(BaseProcess):
    340 
    341     def __init__(self):
    342         self._identity = ()
    343         self._name = 'MainProcess'
    344         self._parent_pid = None
    345         self._popen = None
    346         self._closed = False
    347         self._config = {'authkey': AuthenticationString(os.urandom(32)),
    348                         'semprefix': '/mp'}
    349         # Note that some versions of FreeBSD only allow named
    350         # semaphores to have names of up to 14 characters.  Therefore
    351         # we choose a short prefix.
    352         #
    353         # On MacOSX in a sandbox it may be necessary to use a
    354         # different prefix -- see #19478.
    355         #
    356         # Everything in self._config will be inherited by descendant
    357         # processes.
    358 
    359     def close(self):
    360         pass
    361 
    362 
    363 _current_process = _MainProcess()
    364 _process_counter = itertools.count(1)
    365 _children = set()
    366 del _MainProcess
    367 
    368 #
    369 # Give names to some return codes
    370 #
    371 
    372 _exitcode_to_name = {}
    373 
    374 for name, signum in list(signal.__dict__.items()):
    375     if name[:3]=='SIG' and '_' not in name:
    376         _exitcode_to_name[-signum] = name
    377 
    378 # For debug and leak testing
    379 _dangling = WeakSet()
    380