1 import unittest 2 from test import test_support 3 import subprocess 4 import sys 5 import signal 6 import os 7 import errno 8 import tempfile 9 import time 10 import re 11 import sysconfig 12 13 mswindows = (sys.platform == "win32") 14 15 # 16 # Depends on the following external programs: Python 17 # 18 19 if mswindows: 20 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 21 'os.O_BINARY);') 22 else: 23 SETBINARY = '' 24 25 26 try: 27 mkstemp = tempfile.mkstemp 28 except AttributeError: 29 # tempfile.mkstemp is not available 30 def mkstemp(): 31 """Replacement for mkstemp, calling mktemp.""" 32 fname = tempfile.mktemp() 33 return os.open(fname, os.O_RDWR|os.O_CREAT), fname 34 35 36 class BaseTestCase(unittest.TestCase): 37 def setUp(self): 38 # Try to minimize the number of children we have so this test 39 # doesn't crash on some buildbots (Alphas in particular). 40 test_support.reap_children() 41 42 def tearDown(self): 43 for inst in subprocess._active: 44 inst.wait() 45 subprocess._cleanup() 46 self.assertFalse(subprocess._active, "subprocess._active not empty") 47 48 def assertStderrEqual(self, stderr, expected, msg=None): 49 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 50 # shutdown time. That frustrates tests trying to check stderr produced 51 # from a spawned Python process. 52 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 53 self.assertEqual(actual, expected, msg) 54 55 56 class ProcessTestCase(BaseTestCase): 57 58 def test_call_seq(self): 59 # call() function with sequence argument 60 rc = subprocess.call([sys.executable, "-c", 61 "import sys; sys.exit(47)"]) 62 self.assertEqual(rc, 47) 63 64 def test_check_call_zero(self): 65 # check_call() function with zero return code 66 rc = subprocess.check_call([sys.executable, "-c", 67 "import sys; sys.exit(0)"]) 68 self.assertEqual(rc, 0) 69 70 def test_check_call_nonzero(self): 71 # check_call() function with non-zero return code 72 with self.assertRaises(subprocess.CalledProcessError) as c: 73 subprocess.check_call([sys.executable, "-c", 74 "import sys; sys.exit(47)"]) 75 self.assertEqual(c.exception.returncode, 47) 76 77 def test_check_output(self): 78 # check_output() function with zero return code 79 output = subprocess.check_output( 80 [sys.executable, "-c", "print 'BDFL'"]) 81 self.assertIn('BDFL', output) 82 83 def test_check_output_nonzero(self): 84 # check_call() function with non-zero return code 85 with self.assertRaises(subprocess.CalledProcessError) as c: 86 subprocess.check_output( 87 [sys.executable, "-c", "import sys; sys.exit(5)"]) 88 self.assertEqual(c.exception.returncode, 5) 89 90 def test_check_output_stderr(self): 91 # check_output() function stderr redirected to stdout 92 output = subprocess.check_output( 93 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 94 stderr=subprocess.STDOUT) 95 self.assertIn('BDFL', output) 96 97 def test_check_output_stdout_arg(self): 98 # check_output() function stderr redirected to stdout 99 with self.assertRaises(ValueError) as c: 100 output = subprocess.check_output( 101 [sys.executable, "-c", "print 'will not be run'"], 102 stdout=sys.stdout) 103 self.fail("Expected ValueError when stdout arg supplied.") 104 self.assertIn('stdout', c.exception.args[0]) 105 106 def test_call_kwargs(self): 107 # call() function with keyword args 108 newenv = os.environ.copy() 109 newenv["FRUIT"] = "banana" 110 rc = subprocess.call([sys.executable, "-c", 111 'import sys, os;' 112 'sys.exit(os.getenv("FRUIT")=="banana")'], 113 env=newenv) 114 self.assertEqual(rc, 1) 115 116 def test_stdin_none(self): 117 # .stdin is None when not redirected 118 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 119 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 120 self.addCleanup(p.stdout.close) 121 self.addCleanup(p.stderr.close) 122 p.wait() 123 self.assertEqual(p.stdin, None) 124 125 def test_stdout_none(self): 126 # .stdout is None when not redirected 127 p = subprocess.Popen([sys.executable, "-c", 128 'print " this bit of output is from a ' 129 'test of stdout in a different ' 130 'process ..."'], 131 stdin=subprocess.PIPE, stderr=subprocess.PIPE) 132 self.addCleanup(p.stdin.close) 133 self.addCleanup(p.stderr.close) 134 p.wait() 135 self.assertEqual(p.stdout, None) 136 137 def test_stderr_none(self): 138 # .stderr is None when not redirected 139 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 140 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 141 self.addCleanup(p.stdout.close) 142 self.addCleanup(p.stdin.close) 143 p.wait() 144 self.assertEqual(p.stderr, None) 145 146 def test_executable_with_cwd(self): 147 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 148 p = subprocess.Popen(["somethingyoudonthave", "-c", 149 "import sys; sys.exit(47)"], 150 executable=sys.executable, cwd=python_dir) 151 p.wait() 152 self.assertEqual(p.returncode, 47) 153 154 @unittest.skipIf(sysconfig.is_python_build(), 155 "need an installed Python. See #7774") 156 def test_executable_without_cwd(self): 157 # For a normal installation, it should work without 'cwd' 158 # argument. For test runs in the build directory, see #7774. 159 p = subprocess.Popen(["somethingyoudonthave", "-c", 160 "import sys; sys.exit(47)"], 161 executable=sys.executable) 162 p.wait() 163 self.assertEqual(p.returncode, 47) 164 165 def test_stdin_pipe(self): 166 # stdin redirection 167 p = subprocess.Popen([sys.executable, "-c", 168 'import sys; sys.exit(sys.stdin.read() == "pear")'], 169 stdin=subprocess.PIPE) 170 p.stdin.write("pear") 171 p.stdin.close() 172 p.wait() 173 self.assertEqual(p.returncode, 1) 174 175 def test_stdin_filedes(self): 176 # stdin is set to open file descriptor 177 tf = tempfile.TemporaryFile() 178 d = tf.fileno() 179 os.write(d, "pear") 180 os.lseek(d, 0, 0) 181 p = subprocess.Popen([sys.executable, "-c", 182 'import sys; sys.exit(sys.stdin.read() == "pear")'], 183 stdin=d) 184 p.wait() 185 self.assertEqual(p.returncode, 1) 186 187 def test_stdin_fileobj(self): 188 # stdin is set to open file object 189 tf = tempfile.TemporaryFile() 190 tf.write("pear") 191 tf.seek(0) 192 p = subprocess.Popen([sys.executable, "-c", 193 'import sys; sys.exit(sys.stdin.read() == "pear")'], 194 stdin=tf) 195 p.wait() 196 self.assertEqual(p.returncode, 1) 197 198 def test_stdout_pipe(self): 199 # stdout redirection 200 p = subprocess.Popen([sys.executable, "-c", 201 'import sys; sys.stdout.write("orange")'], 202 stdout=subprocess.PIPE) 203 self.addCleanup(p.stdout.close) 204 self.assertEqual(p.stdout.read(), "orange") 205 206 def test_stdout_filedes(self): 207 # stdout is set to open file descriptor 208 tf = tempfile.TemporaryFile() 209 d = tf.fileno() 210 p = subprocess.Popen([sys.executable, "-c", 211 'import sys; sys.stdout.write("orange")'], 212 stdout=d) 213 p.wait() 214 os.lseek(d, 0, 0) 215 self.assertEqual(os.read(d, 1024), "orange") 216 217 def test_stdout_fileobj(self): 218 # stdout is set to open file object 219 tf = tempfile.TemporaryFile() 220 p = subprocess.Popen([sys.executable, "-c", 221 'import sys; sys.stdout.write("orange")'], 222 stdout=tf) 223 p.wait() 224 tf.seek(0) 225 self.assertEqual(tf.read(), "orange") 226 227 def test_stderr_pipe(self): 228 # stderr redirection 229 p = subprocess.Popen([sys.executable, "-c", 230 'import sys; sys.stderr.write("strawberry")'], 231 stderr=subprocess.PIPE) 232 self.addCleanup(p.stderr.close) 233 self.assertStderrEqual(p.stderr.read(), "strawberry") 234 235 def test_stderr_filedes(self): 236 # stderr is set to open file descriptor 237 tf = tempfile.TemporaryFile() 238 d = tf.fileno() 239 p = subprocess.Popen([sys.executable, "-c", 240 'import sys; sys.stderr.write("strawberry")'], 241 stderr=d) 242 p.wait() 243 os.lseek(d, 0, 0) 244 self.assertStderrEqual(os.read(d, 1024), "strawberry") 245 246 def test_stderr_fileobj(self): 247 # stderr is set to open file object 248 tf = tempfile.TemporaryFile() 249 p = subprocess.Popen([sys.executable, "-c", 250 'import sys; sys.stderr.write("strawberry")'], 251 stderr=tf) 252 p.wait() 253 tf.seek(0) 254 self.assertStderrEqual(tf.read(), "strawberry") 255 256 def test_stdout_stderr_pipe(self): 257 # capture stdout and stderr to the same pipe 258 p = subprocess.Popen([sys.executable, "-c", 259 'import sys;' 260 'sys.stdout.write("apple");' 261 'sys.stdout.flush();' 262 'sys.stderr.write("orange")'], 263 stdout=subprocess.PIPE, 264 stderr=subprocess.STDOUT) 265 self.addCleanup(p.stdout.close) 266 self.assertStderrEqual(p.stdout.read(), "appleorange") 267 268 def test_stdout_stderr_file(self): 269 # capture stdout and stderr to the same open file 270 tf = tempfile.TemporaryFile() 271 p = subprocess.Popen([sys.executable, "-c", 272 'import sys;' 273 'sys.stdout.write("apple");' 274 'sys.stdout.flush();' 275 'sys.stderr.write("orange")'], 276 stdout=tf, 277 stderr=tf) 278 p.wait() 279 tf.seek(0) 280 self.assertStderrEqual(tf.read(), "appleorange") 281 282 def test_stdout_filedes_of_stdout(self): 283 # stdout is set to 1 (#1531862). 284 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" 285 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) 286 self.assertEqual(rc, 2) 287 288 def test_cwd(self): 289 tmpdir = tempfile.gettempdir() 290 # We cannot use os.path.realpath to canonicalize the path, 291 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. 292 cwd = os.getcwd() 293 os.chdir(tmpdir) 294 tmpdir = os.getcwd() 295 os.chdir(cwd) 296 p = subprocess.Popen([sys.executable, "-c", 297 'import sys,os;' 298 'sys.stdout.write(os.getcwd())'], 299 stdout=subprocess.PIPE, 300 cwd=tmpdir) 301 self.addCleanup(p.stdout.close) 302 normcase = os.path.normcase 303 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) 304 305 def test_env(self): 306 newenv = os.environ.copy() 307 newenv["FRUIT"] = "orange" 308 p = subprocess.Popen([sys.executable, "-c", 309 'import sys,os;' 310 'sys.stdout.write(os.getenv("FRUIT"))'], 311 stdout=subprocess.PIPE, 312 env=newenv) 313 self.addCleanup(p.stdout.close) 314 self.assertEqual(p.stdout.read(), "orange") 315 316 def test_communicate_stdin(self): 317 p = subprocess.Popen([sys.executable, "-c", 318 'import sys;' 319 'sys.exit(sys.stdin.read() == "pear")'], 320 stdin=subprocess.PIPE) 321 p.communicate("pear") 322 self.assertEqual(p.returncode, 1) 323 324 def test_communicate_stdout(self): 325 p = subprocess.Popen([sys.executable, "-c", 326 'import sys; sys.stdout.write("pineapple")'], 327 stdout=subprocess.PIPE) 328 (stdout, stderr) = p.communicate() 329 self.assertEqual(stdout, "pineapple") 330 self.assertEqual(stderr, None) 331 332 def test_communicate_stderr(self): 333 p = subprocess.Popen([sys.executable, "-c", 334 'import sys; sys.stderr.write("pineapple")'], 335 stderr=subprocess.PIPE) 336 (stdout, stderr) = p.communicate() 337 self.assertEqual(stdout, None) 338 self.assertStderrEqual(stderr, "pineapple") 339 340 def test_communicate(self): 341 p = subprocess.Popen([sys.executable, "-c", 342 'import sys,os;' 343 'sys.stderr.write("pineapple");' 344 'sys.stdout.write(sys.stdin.read())'], 345 stdin=subprocess.PIPE, 346 stdout=subprocess.PIPE, 347 stderr=subprocess.PIPE) 348 self.addCleanup(p.stdout.close) 349 self.addCleanup(p.stderr.close) 350 self.addCleanup(p.stdin.close) 351 (stdout, stderr) = p.communicate("banana") 352 self.assertEqual(stdout, "banana") 353 self.assertStderrEqual(stderr, "pineapple") 354 355 # This test is Linux specific for simplicity to at least have 356 # some coverage. It is not a platform specific bug. 357 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 358 "Linux specific") 359 # Test for the fd leak reported in http://bugs.python.org/issue2791. 360 def test_communicate_pipe_fd_leak(self): 361 fd_directory = '/proc/%d/fd' % os.getpid() 362 num_fds_before_popen = len(os.listdir(fd_directory)) 363 p = subprocess.Popen([sys.executable, "-c", "print()"], 364 stdout=subprocess.PIPE) 365 p.communicate() 366 num_fds_after_communicate = len(os.listdir(fd_directory)) 367 del p 368 num_fds_after_destruction = len(os.listdir(fd_directory)) 369 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 370 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 371 372 def test_communicate_returns(self): 373 # communicate() should return None if no redirection is active 374 p = subprocess.Popen([sys.executable, "-c", 375 "import sys; sys.exit(47)"]) 376 (stdout, stderr) = p.communicate() 377 self.assertEqual(stdout, None) 378 self.assertEqual(stderr, None) 379 380 def test_communicate_pipe_buf(self): 381 # communicate() with writes larger than pipe_buf 382 # This test will probably deadlock rather than fail, if 383 # communicate() does not work properly. 384 x, y = os.pipe() 385 if mswindows: 386 pipe_buf = 512 387 else: 388 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") 389 os.close(x) 390 os.close(y) 391 p = subprocess.Popen([sys.executable, "-c", 392 'import sys,os;' 393 'sys.stdout.write(sys.stdin.read(47));' 394 'sys.stderr.write("xyz"*%d);' 395 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 396 stdin=subprocess.PIPE, 397 stdout=subprocess.PIPE, 398 stderr=subprocess.PIPE) 399 self.addCleanup(p.stdout.close) 400 self.addCleanup(p.stderr.close) 401 self.addCleanup(p.stdin.close) 402 string_to_write = "abc"*pipe_buf 403 (stdout, stderr) = p.communicate(string_to_write) 404 self.assertEqual(stdout, string_to_write) 405 406 def test_writes_before_communicate(self): 407 # stdin.write before communicate() 408 p = subprocess.Popen([sys.executable, "-c", 409 'import sys,os;' 410 'sys.stdout.write(sys.stdin.read())'], 411 stdin=subprocess.PIPE, 412 stdout=subprocess.PIPE, 413 stderr=subprocess.PIPE) 414 self.addCleanup(p.stdout.close) 415 self.addCleanup(p.stderr.close) 416 self.addCleanup(p.stdin.close) 417 p.stdin.write("banana") 418 (stdout, stderr) = p.communicate("split") 419 self.assertEqual(stdout, "bananasplit") 420 self.assertStderrEqual(stderr, "") 421 422 def test_universal_newlines(self): 423 p = subprocess.Popen([sys.executable, "-c", 424 'import sys,os;' + SETBINARY + 425 'sys.stdout.write("line1\\n");' 426 'sys.stdout.flush();' 427 'sys.stdout.write("line2\\r");' 428 'sys.stdout.flush();' 429 'sys.stdout.write("line3\\r\\n");' 430 'sys.stdout.flush();' 431 'sys.stdout.write("line4\\r");' 432 'sys.stdout.flush();' 433 'sys.stdout.write("\\nline5");' 434 'sys.stdout.flush();' 435 'sys.stdout.write("\\nline6");'], 436 stdout=subprocess.PIPE, 437 universal_newlines=1) 438 self.addCleanup(p.stdout.close) 439 stdout = p.stdout.read() 440 if hasattr(file, 'newlines'): 441 # Interpreter with universal newline support 442 self.assertEqual(stdout, 443 "line1\nline2\nline3\nline4\nline5\nline6") 444 else: 445 # Interpreter without universal newline support 446 self.assertEqual(stdout, 447 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 448 449 def test_universal_newlines_communicate(self): 450 # universal newlines through communicate() 451 p = subprocess.Popen([sys.executable, "-c", 452 'import sys,os;' + SETBINARY + 453 'sys.stdout.write("line1\\n");' 454 'sys.stdout.flush();' 455 'sys.stdout.write("line2\\r");' 456 'sys.stdout.flush();' 457 'sys.stdout.write("line3\\r\\n");' 458 'sys.stdout.flush();' 459 'sys.stdout.write("line4\\r");' 460 'sys.stdout.flush();' 461 'sys.stdout.write("\\nline5");' 462 'sys.stdout.flush();' 463 'sys.stdout.write("\\nline6");'], 464 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 465 universal_newlines=1) 466 self.addCleanup(p.stdout.close) 467 self.addCleanup(p.stderr.close) 468 (stdout, stderr) = p.communicate() 469 if hasattr(file, 'newlines'): 470 # Interpreter with universal newline support 471 self.assertEqual(stdout, 472 "line1\nline2\nline3\nline4\nline5\nline6") 473 else: 474 # Interpreter without universal newline support 475 self.assertEqual(stdout, 476 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 477 478 def test_no_leaking(self): 479 # Make sure we leak no resources 480 if not mswindows: 481 max_handles = 1026 # too much for most UNIX systems 482 else: 483 max_handles = 2050 # too much for (at least some) Windows setups 484 handles = [] 485 try: 486 for i in range(max_handles): 487 try: 488 handles.append(os.open(test_support.TESTFN, 489 os.O_WRONLY | os.O_CREAT)) 490 except OSError as e: 491 if e.errno != errno.EMFILE: 492 raise 493 break 494 else: 495 self.skipTest("failed to reach the file descriptor limit " 496 "(tried %d)" % max_handles) 497 # Close a couple of them (should be enough for a subprocess) 498 for i in range(10): 499 os.close(handles.pop()) 500 # Loop creating some subprocesses. If one of them leaks some fds, 501 # the next loop iteration will fail by reaching the max fd limit. 502 for i in range(15): 503 p = subprocess.Popen([sys.executable, "-c", 504 "import sys;" 505 "sys.stdout.write(sys.stdin.read())"], 506 stdin=subprocess.PIPE, 507 stdout=subprocess.PIPE, 508 stderr=subprocess.PIPE) 509 data = p.communicate(b"lime")[0] 510 self.assertEqual(data, b"lime") 511 finally: 512 for h in handles: 513 os.close(h) 514 515 def test_list2cmdline(self): 516 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 517 '"a b c" d e') 518 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 519 'ab\\"c \\ d') 520 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 521 'ab\\"c " \\\\" d') 522 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 523 'a\\\\\\b "de fg" h') 524 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 525 'a\\\\\\"b c d') 526 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 527 '"a\\\\b c" d e') 528 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 529 '"a\\\\b\\ c" d e') 530 self.assertEqual(subprocess.list2cmdline(['ab', '']), 531 'ab ""') 532 533 534 def test_poll(self): 535 p = subprocess.Popen([sys.executable, 536 "-c", "import time; time.sleep(1)"]) 537 count = 0 538 while p.poll() is None: 539 time.sleep(0.1) 540 count += 1 541 # We expect that the poll loop probably went around about 10 times, 542 # but, based on system scheduling we can't control, it's possible 543 # poll() never returned None. It "should be" very rare that it 544 # didn't go around at least twice. 545 self.assertGreaterEqual(count, 2) 546 # Subsequent invocations should just return the returncode 547 self.assertEqual(p.poll(), 0) 548 549 550 def test_wait(self): 551 p = subprocess.Popen([sys.executable, 552 "-c", "import time; time.sleep(2)"]) 553 self.assertEqual(p.wait(), 0) 554 # Subsequent invocations should just return the returncode 555 self.assertEqual(p.wait(), 0) 556 557 558 def test_invalid_bufsize(self): 559 # an invalid type of the bufsize argument should raise 560 # TypeError. 561 with self.assertRaises(TypeError): 562 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 563 564 def test_leaking_fds_on_error(self): 565 # see bug #5179: Popen leaks file descriptors to PIPEs if 566 # the child fails to execute; this will eventually exhaust 567 # the maximum number of open fds. 1024 seems a very common 568 # value for that limit, but Windows has 2048, so we loop 569 # 1024 times (each call leaked two fds). 570 for i in range(1024): 571 # Windows raises IOError. Others raise OSError. 572 with self.assertRaises(EnvironmentError) as c: 573 subprocess.Popen(['nonexisting_i_hope'], 574 stdout=subprocess.PIPE, 575 stderr=subprocess.PIPE) 576 # ignore errors that indicate the command was not found 577 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 578 raise c.exception 579 580 def test_handles_closed_on_exception(self): 581 # If CreateProcess exits with an error, ensure the 582 # duplicate output handles are released 583 ifhandle, ifname = mkstemp() 584 ofhandle, ofname = mkstemp() 585 efhandle, efname = mkstemp() 586 try: 587 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 588 stderr=efhandle) 589 except OSError: 590 os.close(ifhandle) 591 os.remove(ifname) 592 os.close(ofhandle) 593 os.remove(ofname) 594 os.close(efhandle) 595 os.remove(efname) 596 self.assertFalse(os.path.exists(ifname)) 597 self.assertFalse(os.path.exists(ofname)) 598 self.assertFalse(os.path.exists(efname)) 599 600 def test_communicate_epipe(self): 601 # Issue 10963: communicate() should hide EPIPE 602 p = subprocess.Popen([sys.executable, "-c", 'pass'], 603 stdin=subprocess.PIPE, 604 stdout=subprocess.PIPE, 605 stderr=subprocess.PIPE) 606 self.addCleanup(p.stdout.close) 607 self.addCleanup(p.stderr.close) 608 self.addCleanup(p.stdin.close) 609 p.communicate("x" * 2**20) 610 611 def test_communicate_epipe_only_stdin(self): 612 # Issue 10963: communicate() should hide EPIPE 613 p = subprocess.Popen([sys.executable, "-c", 'pass'], 614 stdin=subprocess.PIPE) 615 self.addCleanup(p.stdin.close) 616 time.sleep(2) 617 p.communicate("x" * 2**20) 618 619 # context manager 620 class _SuppressCoreFiles(object): 621 """Try to prevent core files from being created.""" 622 old_limit = None 623 624 def __enter__(self): 625 """Try to save previous ulimit, then set it to (0, 0).""" 626 try: 627 import resource 628 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 629 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 630 except (ImportError, ValueError, resource.error): 631 pass 632 633 if sys.platform == 'darwin': 634 # Check if the 'Crash Reporter' on OSX was configured 635 # in 'Developer' mode and warn that it will get triggered 636 # when it is. 637 # 638 # This assumes that this context manager is used in tests 639 # that might trigger the next manager. 640 value = subprocess.Popen(['/usr/bin/defaults', 'read', 641 'com.apple.CrashReporter', 'DialogType'], 642 stdout=subprocess.PIPE).communicate()[0] 643 if value.strip() == b'developer': 644 print "this tests triggers the Crash Reporter, that is intentional" 645 sys.stdout.flush() 646 647 def __exit__(self, *args): 648 """Return core file behavior to default.""" 649 if self.old_limit is None: 650 return 651 try: 652 import resource 653 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 654 except (ImportError, ValueError, resource.error): 655 pass 656 657 658 @unittest.skipIf(mswindows, "POSIX specific tests") 659 class POSIXProcessTestCase(BaseTestCase): 660 661 def test_exceptions(self): 662 # caught & re-raised exceptions 663 with self.assertRaises(OSError) as c: 664 p = subprocess.Popen([sys.executable, "-c", ""], 665 cwd="/this/path/does/not/exist") 666 # The attribute child_traceback should contain "os.chdir" somewhere. 667 self.assertIn("os.chdir", c.exception.child_traceback) 668 669 def test_run_abort(self): 670 # returncode handles signal termination 671 with _SuppressCoreFiles(): 672 p = subprocess.Popen([sys.executable, "-c", 673 "import os; os.abort()"]) 674 p.wait() 675 self.assertEqual(-p.returncode, signal.SIGABRT) 676 677 def test_preexec(self): 678 # preexec function 679 p = subprocess.Popen([sys.executable, "-c", 680 "import sys, os;" 681 "sys.stdout.write(os.getenv('FRUIT'))"], 682 stdout=subprocess.PIPE, 683 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 684 self.addCleanup(p.stdout.close) 685 self.assertEqual(p.stdout.read(), "apple") 686 687 def test_args_string(self): 688 # args is a string 689 f, fname = mkstemp() 690 os.write(f, "#!/bin/sh\n") 691 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 692 sys.executable) 693 os.close(f) 694 os.chmod(fname, 0o700) 695 p = subprocess.Popen(fname) 696 p.wait() 697 os.remove(fname) 698 self.assertEqual(p.returncode, 47) 699 700 def test_invalid_args(self): 701 # invalid arguments should raise ValueError 702 self.assertRaises(ValueError, subprocess.call, 703 [sys.executable, "-c", 704 "import sys; sys.exit(47)"], 705 startupinfo=47) 706 self.assertRaises(ValueError, subprocess.call, 707 [sys.executable, "-c", 708 "import sys; sys.exit(47)"], 709 creationflags=47) 710 711 def test_shell_sequence(self): 712 # Run command through the shell (sequence) 713 newenv = os.environ.copy() 714 newenv["FRUIT"] = "apple" 715 p = subprocess.Popen(["echo $FRUIT"], shell=1, 716 stdout=subprocess.PIPE, 717 env=newenv) 718 self.addCleanup(p.stdout.close) 719 self.assertEqual(p.stdout.read().strip(), "apple") 720 721 def test_shell_string(self): 722 # Run command through the shell (string) 723 newenv = os.environ.copy() 724 newenv["FRUIT"] = "apple" 725 p = subprocess.Popen("echo $FRUIT", shell=1, 726 stdout=subprocess.PIPE, 727 env=newenv) 728 self.addCleanup(p.stdout.close) 729 self.assertEqual(p.stdout.read().strip(), "apple") 730 731 def test_call_string(self): 732 # call() function with string argument on UNIX 733 f, fname = mkstemp() 734 os.write(f, "#!/bin/sh\n") 735 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 736 sys.executable) 737 os.close(f) 738 os.chmod(fname, 0700) 739 rc = subprocess.call(fname) 740 os.remove(fname) 741 self.assertEqual(rc, 47) 742 743 def test_specific_shell(self): 744 # Issue #9265: Incorrect name passed as arg[0]. 745 shells = [] 746 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 747 for name in ['bash', 'ksh']: 748 sh = os.path.join(prefix, name) 749 if os.path.isfile(sh): 750 shells.append(sh) 751 if not shells: # Will probably work for any shell but csh. 752 self.skipTest("bash or ksh required for this test") 753 sh = '/bin/sh' 754 if os.path.isfile(sh) and not os.path.islink(sh): 755 # Test will fail if /bin/sh is a symlink to csh. 756 shells.append(sh) 757 for sh in shells: 758 p = subprocess.Popen("echo $0", executable=sh, shell=True, 759 stdout=subprocess.PIPE) 760 self.addCleanup(p.stdout.close) 761 self.assertEqual(p.stdout.read().strip(), sh) 762 763 def _kill_process(self, method, *args): 764 # Do not inherit file handles from the parent. 765 # It should fix failures on some platforms. 766 p = subprocess.Popen([sys.executable, "-c", """if 1: 767 import sys, time 768 sys.stdout.write('x\\n') 769 sys.stdout.flush() 770 time.sleep(30) 771 """], 772 close_fds=True, 773 stdin=subprocess.PIPE, 774 stdout=subprocess.PIPE, 775 stderr=subprocess.PIPE) 776 # Wait for the interpreter to be completely initialized before 777 # sending any signal. 778 p.stdout.read(1) 779 getattr(p, method)(*args) 780 return p 781 782 def test_send_signal(self): 783 p = self._kill_process('send_signal', signal.SIGINT) 784 _, stderr = p.communicate() 785 self.assertIn('KeyboardInterrupt', stderr) 786 self.assertNotEqual(p.wait(), 0) 787 788 def test_kill(self): 789 p = self._kill_process('kill') 790 _, stderr = p.communicate() 791 self.assertStderrEqual(stderr, '') 792 self.assertEqual(p.wait(), -signal.SIGKILL) 793 794 def test_terminate(self): 795 p = self._kill_process('terminate') 796 _, stderr = p.communicate() 797 self.assertStderrEqual(stderr, '') 798 self.assertEqual(p.wait(), -signal.SIGTERM) 799 800 def check_close_std_fds(self, fds): 801 # Issue #9905: test that subprocess pipes still work properly with 802 # some standard fds closed 803 stdin = 0 804 newfds = [] 805 for a in fds: 806 b = os.dup(a) 807 newfds.append(b) 808 if a == 0: 809 stdin = b 810 try: 811 for fd in fds: 812 os.close(fd) 813 out, err = subprocess.Popen([sys.executable, "-c", 814 'import sys;' 815 'sys.stdout.write("apple");' 816 'sys.stdout.flush();' 817 'sys.stderr.write("orange")'], 818 stdin=stdin, 819 stdout=subprocess.PIPE, 820 stderr=subprocess.PIPE).communicate() 821 err = test_support.strip_python_stderr(err) 822 self.assertEqual((out, err), (b'apple', b'orange')) 823 finally: 824 for b, a in zip(newfds, fds): 825 os.dup2(b, a) 826 for b in newfds: 827 os.close(b) 828 829 def test_close_fd_0(self): 830 self.check_close_std_fds([0]) 831 832 def test_close_fd_1(self): 833 self.check_close_std_fds([1]) 834 835 def test_close_fd_2(self): 836 self.check_close_std_fds([2]) 837 838 def test_close_fds_0_1(self): 839 self.check_close_std_fds([0, 1]) 840 841 def test_close_fds_0_2(self): 842 self.check_close_std_fds([0, 2]) 843 844 def test_close_fds_1_2(self): 845 self.check_close_std_fds([1, 2]) 846 847 def test_close_fds_0_1_2(self): 848 # Issue #10806: test that subprocess pipes still work properly with 849 # all standard fds closed. 850 self.check_close_std_fds([0, 1, 2]) 851 852 def test_wait_when_sigchild_ignored(self): 853 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 854 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 855 subdir="subprocessdata") 856 p = subprocess.Popen([sys.executable, sigchild_ignore], 857 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 858 stdout, stderr = p.communicate() 859 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 860 " non-zero with this error:\n%s" % stderr) 861 862 863 @unittest.skipUnless(mswindows, "Windows specific tests") 864 class Win32ProcessTestCase(BaseTestCase): 865 866 def test_startupinfo(self): 867 # startupinfo argument 868 # We uses hardcoded constants, because we do not want to 869 # depend on win32all. 870 STARTF_USESHOWWINDOW = 1 871 SW_MAXIMIZE = 3 872 startupinfo = subprocess.STARTUPINFO() 873 startupinfo.dwFlags = STARTF_USESHOWWINDOW 874 startupinfo.wShowWindow = SW_MAXIMIZE 875 # Since Python is a console process, it won't be affected 876 # by wShowWindow, but the argument should be silently 877 # ignored 878 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 879 startupinfo=startupinfo) 880 881 def test_creationflags(self): 882 # creationflags argument 883 CREATE_NEW_CONSOLE = 16 884 sys.stderr.write(" a DOS box should flash briefly ...\n") 885 subprocess.call(sys.executable + 886 ' -c "import time; time.sleep(0.25)"', 887 creationflags=CREATE_NEW_CONSOLE) 888 889 def test_invalid_args(self): 890 # invalid arguments should raise ValueError 891 self.assertRaises(ValueError, subprocess.call, 892 [sys.executable, "-c", 893 "import sys; sys.exit(47)"], 894 preexec_fn=lambda: 1) 895 self.assertRaises(ValueError, subprocess.call, 896 [sys.executable, "-c", 897 "import sys; sys.exit(47)"], 898 stdout=subprocess.PIPE, 899 close_fds=True) 900 901 def test_close_fds(self): 902 # close file descriptors 903 rc = subprocess.call([sys.executable, "-c", 904 "import sys; sys.exit(47)"], 905 close_fds=True) 906 self.assertEqual(rc, 47) 907 908 def test_shell_sequence(self): 909 # Run command through the shell (sequence) 910 newenv = os.environ.copy() 911 newenv["FRUIT"] = "physalis" 912 p = subprocess.Popen(["set"], shell=1, 913 stdout=subprocess.PIPE, 914 env=newenv) 915 self.addCleanup(p.stdout.close) 916 self.assertIn("physalis", p.stdout.read()) 917 918 def test_shell_string(self): 919 # Run command through the shell (string) 920 newenv = os.environ.copy() 921 newenv["FRUIT"] = "physalis" 922 p = subprocess.Popen("set", shell=1, 923 stdout=subprocess.PIPE, 924 env=newenv) 925 self.addCleanup(p.stdout.close) 926 self.assertIn("physalis", p.stdout.read()) 927 928 def test_call_string(self): 929 # call() function with string argument on Windows 930 rc = subprocess.call(sys.executable + 931 ' -c "import sys; sys.exit(47)"') 932 self.assertEqual(rc, 47) 933 934 def _kill_process(self, method, *args): 935 # Some win32 buildbot raises EOFError if stdin is inherited 936 p = subprocess.Popen([sys.executable, "-c", """if 1: 937 import sys, time 938 sys.stdout.write('x\\n') 939 sys.stdout.flush() 940 time.sleep(30) 941 """], 942 stdin=subprocess.PIPE, 943 stdout=subprocess.PIPE, 944 stderr=subprocess.PIPE) 945 self.addCleanup(p.stdout.close) 946 self.addCleanup(p.stderr.close) 947 self.addCleanup(p.stdin.close) 948 # Wait for the interpreter to be completely initialized before 949 # sending any signal. 950 p.stdout.read(1) 951 getattr(p, method)(*args) 952 _, stderr = p.communicate() 953 self.assertStderrEqual(stderr, '') 954 returncode = p.wait() 955 self.assertNotEqual(returncode, 0) 956 957 def test_send_signal(self): 958 self._kill_process('send_signal', signal.SIGTERM) 959 960 def test_kill(self): 961 self._kill_process('kill') 962 963 def test_terminate(self): 964 self._kill_process('terminate') 965 966 967 @unittest.skipUnless(getattr(subprocess, '_has_poll', False), 968 "poll system call not supported") 969 class ProcessTestCaseNoPoll(ProcessTestCase): 970 def setUp(self): 971 subprocess._has_poll = False 972 ProcessTestCase.setUp(self) 973 974 def tearDown(self): 975 subprocess._has_poll = True 976 ProcessTestCase.tearDown(self) 977 978 979 class HelperFunctionTests(unittest.TestCase): 980 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 981 def test_eintr_retry_call(self): 982 record_calls = [] 983 def fake_os_func(*args): 984 record_calls.append(args) 985 if len(record_calls) == 2: 986 raise OSError(errno.EINTR, "fake interrupted system call") 987 return tuple(reversed(args)) 988 989 self.assertEqual((999, 256), 990 subprocess._eintr_retry_call(fake_os_func, 256, 999)) 991 self.assertEqual([(256, 999)], record_calls) 992 # This time there will be an EINTR so it will loop once. 993 self.assertEqual((666,), 994 subprocess._eintr_retry_call(fake_os_func, 666)) 995 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 996 997 998 @unittest.skipUnless(mswindows, "mswindows only") 999 class CommandsWithSpaces (BaseTestCase): 1000 1001 def setUp(self): 1002 super(CommandsWithSpaces, self).setUp() 1003 f, fname = mkstemp(".py", "te st") 1004 self.fname = fname.lower () 1005 os.write(f, b"import sys;" 1006 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1007 ) 1008 os.close(f) 1009 1010 def tearDown(self): 1011 os.remove(self.fname) 1012 super(CommandsWithSpaces, self).tearDown() 1013 1014 def with_spaces(self, *args, **kwargs): 1015 kwargs['stdout'] = subprocess.PIPE 1016 p = subprocess.Popen(*args, **kwargs) 1017 self.addCleanup(p.stdout.close) 1018 self.assertEqual( 1019 p.stdout.read ().decode("mbcs"), 1020 "2 [%r, 'ab cd']" % self.fname 1021 ) 1022 1023 def test_shell_string_with_spaces(self): 1024 # call() function with string argument with spaces on Windows 1025 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1026 "ab cd"), shell=1) 1027 1028 def test_shell_sequence_with_spaces(self): 1029 # call() function with sequence argument with spaces on Windows 1030 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1031 1032 def test_noshell_string_with_spaces(self): 1033 # call() function with string argument with spaces on Windows 1034 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1035 "ab cd")) 1036 1037 def test_noshell_sequence_with_spaces(self): 1038 # call() function with sequence argument with spaces on Windows 1039 self.with_spaces([sys.executable, self.fname, "ab cd"]) 1040 1041 def test_main(): 1042 unit_tests = (ProcessTestCase, 1043 POSIXProcessTestCase, 1044 Win32ProcessTestCase, 1045 ProcessTestCaseNoPoll, 1046 HelperFunctionTests, 1047 CommandsWithSpaces) 1048 1049 test_support.run_unittest(*unit_tests) 1050 test_support.reap_children() 1051 1052 if __name__ == "__main__": 1053 test_main() 1054