1 # 2 # Module for starting a process object using os.fork() or CreateProcess() 3 # 4 # multiprocessing/forking.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 33 # 34 35 import os 36 import sys 37 import signal 38 import errno 39 40 from multiprocessing import util, process 41 42 __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] 43 44 # 45 # Check that the current thread is spawning a child process 46 # 47 48 def assert_spawning(self): 49 if not Popen.thread_is_spawning(): 50 raise RuntimeError( 51 '%s objects should only be shared between processes' 52 ' through inheritance' % type(self).__name__ 53 ) 54 55 # 56 # Try making some callable types picklable 57 # 58 59 from pickle import Pickler 60 class ForkingPickler(Pickler): 61 dispatch = Pickler.dispatch.copy() 62 63 @classmethod 64 def register(cls, type, reduce): 65 def dispatcher(self, obj): 66 rv = reduce(obj) 67 self.save_reduce(obj=obj, *rv) 68 cls.dispatch[type] = dispatcher 69 70 def _reduce_method(m): 71 if m.im_self is None: 72 return getattr, (m.im_class, m.im_func.func_name) 73 else: 74 return getattr, (m.im_self, m.im_func.func_name) 75 ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 76 77 def _reduce_method_descriptor(m): 78 return getattr, (m.__objclass__, m.__name__) 79 ForkingPickler.register(type(list.append), _reduce_method_descriptor) 80 ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) 81 82 #def _reduce_builtin_function_or_method(m): 83 # return getattr, (m.__self__, m.__name__) 84 #ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method) 85 #ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method) 86 87 try: 88 from functools import partial 89 except ImportError: 90 pass 91 else: 92 def _reduce_partial(p): 93 return _rebuild_partial, (p.func, p.args, p.keywords or {}) 94 def _rebuild_partial(func, args, keywords): 95 return partial(func, *args, **keywords) 96 ForkingPickler.register(partial, _reduce_partial) 97 98 # 99 # Unix 100 # 101 102 if sys.platform != 'win32': 103 import time 104 105 exit = os._exit 106 duplicate = os.dup 107 close = os.close 108 109 # 110 # We define a Popen class similar to the one from subprocess, but 111 # whose constructor takes a process object as its argument. 112 # 113 114 class Popen(object): 115 116 def __init__(self, process_obj): 117 sys.stdout.flush() 118 sys.stderr.flush() 119 self.returncode = None 120 121 self.pid = os.fork() 122 if self.pid == 0: 123 if 'random' in sys.modules: 124 import random 125 random.seed() 126 code = process_obj._bootstrap() 127 sys.stdout.flush() 128 sys.stderr.flush() 129 os._exit(code) 130 131 def poll(self, flag=os.WNOHANG): 132 if self.returncode is None: 133 while True: 134 try: 135 pid, sts = os.waitpid(self.pid, flag) 136 except os.error as e: 137 if e.errno == errno.EINTR: 138 continue 139 # Child process not yet created. See #1731717 140 # e.errno == errno.ECHILD == 10 141 return None 142 else: 143 break 144 if pid == self.pid: 145 if os.WIFSIGNALED(sts): 146 self.returncode = -os.WTERMSIG(sts) 147 else: 148 assert os.WIFEXITED(sts) 149 self.returncode = os.WEXITSTATUS(sts) 150 return self.returncode 151 152 def wait(self, timeout=None): 153 if timeout is None: 154 return self.poll(0) 155 deadline = time.time() + timeout 156 delay = 0.0005 157 while 1: 158 res = self.poll() 159 if res is not None: 160 break 161 remaining = deadline - time.time() 162 if remaining <= 0: 163 break 164 delay = min(delay * 2, remaining, 0.05) 165 time.sleep(delay) 166 return res 167 168 def terminate(self): 169 if self.returncode is None: 170 try: 171 os.kill(self.pid, signal.SIGTERM) 172 except OSError, e: 173 if self.wait(timeout=0.1) is None: 174 raise 175 176 @staticmethod 177 def thread_is_spawning(): 178 return False 179 180 # 181 # Windows 182 # 183 184 else: 185 import thread 186 import msvcrt 187 import _subprocess 188 import time 189 190 from _multiprocessing import win32, Connection, PipeConnection 191 from .util import Finalize 192 193 #try: 194 # from cPickle import dump, load, HIGHEST_PROTOCOL 195 #except ImportError: 196 from pickle import load, HIGHEST_PROTOCOL 197 198 def dump(obj, file, protocol=None): 199 ForkingPickler(file, protocol).dump(obj) 200 201 # 202 # 203 # 204 205 TERMINATE = 0x10000 206 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) 207 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") 208 209 exit = win32.ExitProcess 210 close = win32.CloseHandle 211 212 # 213 # _python_exe is the assumed path to the python executable. 214 # People embedding Python want to modify it. 215 # 216 217 if WINSERVICE: 218 _python_exe = os.path.join(sys.exec_prefix, 'python.exe') 219 else: 220 _python_exe = sys.executable 221 222 def set_executable(exe): 223 global _python_exe 224 _python_exe = exe 225 226 # 227 # 228 # 229 230 def duplicate(handle, target_process=None, inheritable=False): 231 if target_process is None: 232 target_process = _subprocess.GetCurrentProcess() 233 return _subprocess.DuplicateHandle( 234 _subprocess.GetCurrentProcess(), handle, target_process, 235 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS 236 ).Detach() 237 238 # 239 # We define a Popen class similar to the one from subprocess, but 240 # whose constructor takes a process object as its argument. 241 # 242 243 class Popen(object): 244 ''' 245 Start a subprocess to run the code of a process object 246 ''' 247 _tls = thread._local() 248 249 def __init__(self, process_obj): 250 # create pipe for communication with child 251 rfd, wfd = os.pipe() 252 253 # get handle for read end of the pipe and make it inheritable 254 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) 255 os.close(rfd) 256 257 # start process 258 cmd = get_command_line() + [rhandle] 259 cmd = ' '.join('"%s"' % x for x in cmd) 260 hp, ht, pid, tid = _subprocess.CreateProcess( 261 _python_exe, cmd, None, None, 1, 0, None, None, None 262 ) 263 ht.Close() 264 close(rhandle) 265 266 # set attributes of self 267 self.pid = pid 268 self.returncode = None 269 self._handle = hp 270 271 # send information to child 272 prep_data = get_preparation_data(process_obj._name) 273 to_child = os.fdopen(wfd, 'wb') 274 Popen._tls.process_handle = int(hp) 275 try: 276 dump(prep_data, to_child, HIGHEST_PROTOCOL) 277 dump(process_obj, to_child, HIGHEST_PROTOCOL) 278 finally: 279 del Popen._tls.process_handle 280 to_child.close() 281 282 @staticmethod 283 def thread_is_spawning(): 284 return getattr(Popen._tls, 'process_handle', None) is not None 285 286 @staticmethod 287 def duplicate_for_child(handle): 288 return duplicate(handle, Popen._tls.process_handle) 289 290 def wait(self, timeout=None): 291 if self.returncode is None: 292 if timeout is None: 293 msecs = _subprocess.INFINITE 294 else: 295 msecs = max(0, int(timeout * 1000 + 0.5)) 296 297 res = _subprocess.WaitForSingleObject(int(self._handle), msecs) 298 if res == _subprocess.WAIT_OBJECT_0: 299 code = _subprocess.GetExitCodeProcess(self._handle) 300 if code == TERMINATE: 301 code = -signal.SIGTERM 302 self.returncode = code 303 304 return self.returncode 305 306 def poll(self): 307 return self.wait(timeout=0) 308 309 def terminate(self): 310 if self.returncode is None: 311 try: 312 _subprocess.TerminateProcess(int(self._handle), TERMINATE) 313 except WindowsError: 314 if self.wait(timeout=0.1) is None: 315 raise 316 317 # 318 # 319 # 320 321 def is_forking(argv): 322 ''' 323 Return whether commandline indicates we are forking 324 ''' 325 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': 326 assert len(argv) == 3 327 return True 328 else: 329 return False 330 331 332 def freeze_support(): 333 ''' 334 Run code for process object if this in not the main process 335 ''' 336 if is_forking(sys.argv): 337 main() 338 sys.exit() 339 340 341 def get_command_line(): 342 ''' 343 Returns prefix of command line used for spawning a child process 344 ''' 345 if getattr(process.current_process(), '_inheriting', False): 346 raise RuntimeError(''' 347 Attempt to start a new process before the current process 348 has finished its bootstrapping phase. 349 350 This probably means that you are on Windows and you have 351 forgotten to use the proper idiom in the main module: 352 353 if __name__ == '__main__': 354 freeze_support() 355 ... 356 357 The "freeze_support()" line can be omitted if the program 358 is not going to be frozen to produce a Windows executable.''') 359 360 if getattr(sys, 'frozen', False): 361 return [sys.executable, '--multiprocessing-fork'] 362 else: 363 prog = 'from multiprocessing.forking import main; main()' 364 opts = util._args_from_interpreter_flags() 365 return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] 366 367 368 def main(): 369 ''' 370 Run code specifed by data received over pipe 371 ''' 372 assert is_forking(sys.argv) 373 374 handle = int(sys.argv[-1]) 375 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) 376 from_parent = os.fdopen(fd, 'rb') 377 378 process.current_process()._inheriting = True 379 preparation_data = load(from_parent) 380 prepare(preparation_data) 381 self = load(from_parent) 382 process.current_process()._inheriting = False 383 384 from_parent.close() 385 386 exitcode = self._bootstrap() 387 exit(exitcode) 388 389 390 def get_preparation_data(name): 391 ''' 392 Return info about parent needed by child to unpickle process object 393 ''' 394 from .util import _logger, _log_to_stderr 395 396 d = dict( 397 name=name, 398 sys_path=sys.path, 399 sys_argv=sys.argv, 400 log_to_stderr=_log_to_stderr, 401 orig_dir=process.ORIGINAL_DIR, 402 authkey=process.current_process().authkey, 403 ) 404 405 if _logger is not None: 406 d['log_level'] = _logger.getEffectiveLevel() 407 408 if not WINEXE and not WINSERVICE: 409 main_path = getattr(sys.modules['__main__'], '__file__', None) 410 if not main_path and sys.argv[0] not in ('', '-c'): 411 main_path = sys.argv[0] 412 if main_path is not None: 413 if not os.path.isabs(main_path) and \ 414 process.ORIGINAL_DIR is not None: 415 main_path = os.path.join(process.ORIGINAL_DIR, main_path) 416 d['main_path'] = os.path.normpath(main_path) 417 418 return d 419 420 # 421 # Make (Pipe)Connection picklable 422 # 423 424 def reduce_connection(conn): 425 if not Popen.thread_is_spawning(): 426 raise RuntimeError( 427 'By default %s objects can only be shared between processes\n' 428 'using inheritance' % type(conn).__name__ 429 ) 430 return type(conn), (Popen.duplicate_for_child(conn.fileno()), 431 conn.readable, conn.writable) 432 433 ForkingPickler.register(Connection, reduce_connection) 434 ForkingPickler.register(PipeConnection, reduce_connection) 435 436 # 437 # Prepare current process 438 # 439 440 old_main_modules = [] 441 442 def prepare(data): 443 ''' 444 Try to get current process ready to unpickle process object 445 ''' 446 old_main_modules.append(sys.modules['__main__']) 447 448 if 'name' in data: 449 process.current_process().name = data['name'] 450 451 if 'authkey' in data: 452 process.current_process()._authkey = data['authkey'] 453 454 if 'log_to_stderr' in data and data['log_to_stderr']: 455 util.log_to_stderr() 456 457 if 'log_level' in data: 458 util.get_logger().setLevel(data['log_level']) 459 460 if 'sys_path' in data: 461 sys.path = data['sys_path'] 462 463 if 'sys_argv' in data: 464 sys.argv = data['sys_argv'] 465 466 if 'dir' in data: 467 os.chdir(data['dir']) 468 469 if 'orig_dir' in data: 470 process.ORIGINAL_DIR = data['orig_dir'] 471 472 if 'main_path' in data: 473 main_path = data['main_path'] 474 main_name = os.path.splitext(os.path.basename(main_path))[0] 475 if main_name == '__init__': 476 main_name = os.path.basename(os.path.dirname(main_path)) 477 478 if main_name != 'ipython': 479 import imp 480 481 if main_path is None: 482 dirs = None 483 elif os.path.basename(main_path).startswith('__init__.py'): 484 dirs = [os.path.dirname(os.path.dirname(main_path))] 485 else: 486 dirs = [os.path.dirname(main_path)] 487 488 assert main_name not in sys.modules, main_name 489 file, path_name, etc = imp.find_module(main_name, dirs) 490 try: 491 # We would like to do "imp.load_module('__main__', ...)" 492 # here. However, that would cause 'if __name__ == 493 # "__main__"' clauses to be executed. 494 main_module = imp.load_module( 495 '__parents_main__', file, path_name, etc 496 ) 497 finally: 498 if file: 499 file.close() 500 501 sys.modules['__main__'] = main_module 502 main_module.__name__ = '__main__' 503 504 # Try to make the potentially picklable objects in 505 # sys.modules['__main__'] realize they are in the main 506 # module -- somewhat ugly. 507 for obj in main_module.__dict__.values(): 508 try: 509 if obj.__module__ == '__parents_main__': 510 obj.__module__ = '__main__' 511 except Exception: 512 pass 513