1 # tempfile.py unit tests. 2 import tempfile 3 import errno 4 import io 5 import os 6 import signal 7 import sys 8 import re 9 import warnings 10 import contextlib 11 import weakref 12 from unittest import mock 13 14 import unittest 15 from test import support 16 from test.support import script_helper 17 18 19 if hasattr(os, 'stat'): 20 import stat 21 has_stat = 1 22 else: 23 has_stat = 0 24 25 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 26 has_spawnl = hasattr(os, 'spawnl') 27 28 # TEST_FILES may need to be tweaked for systems depending on the maximum 29 # number of files that can be opened at one time (see ulimit -n) 30 if sys.platform.startswith('openbsd'): 31 TEST_FILES = 48 32 else: 33 TEST_FILES = 100 34 35 # This is organized as one test for each chunk of code in tempfile.py, 36 # in order of their appearance in the file. Testing which requires 37 # threads is not done here. 38 39 class TestLowLevelInternals(unittest.TestCase): 40 def test_infer_return_type_singles(self): 41 self.assertIs(str, tempfile._infer_return_type('')) 42 self.assertIs(bytes, tempfile._infer_return_type(b'')) 43 self.assertIs(str, tempfile._infer_return_type(None)) 44 45 def test_infer_return_type_multiples(self): 46 self.assertIs(str, tempfile._infer_return_type('', '')) 47 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 48 with self.assertRaises(TypeError): 49 tempfile._infer_return_type('', b'') 50 with self.assertRaises(TypeError): 51 tempfile._infer_return_type(b'', '') 52 53 def test_infer_return_type_multiples_and_none(self): 54 self.assertIs(str, tempfile._infer_return_type(None, '')) 55 self.assertIs(str, tempfile._infer_return_type('', None)) 56 self.assertIs(str, tempfile._infer_return_type(None, None)) 57 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 58 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 59 with self.assertRaises(TypeError): 60 tempfile._infer_return_type('', None, b'') 61 with self.assertRaises(TypeError): 62 tempfile._infer_return_type(b'', None, '') 63 64 65 # Common functionality. 66 67 class BaseTestCase(unittest.TestCase): 68 69 str_check = re.compile(r"^[a-z0-9_-]{8}$") 70 b_check = re.compile(br"^[a-z0-9_-]{8}$") 71 72 def setUp(self): 73 self._warnings_manager = support.check_warnings() 74 self._warnings_manager.__enter__() 75 warnings.filterwarnings("ignore", category=RuntimeWarning, 76 message="mktemp", module=__name__) 77 78 def tearDown(self): 79 self._warnings_manager.__exit__(None, None, None) 80 81 82 def nameCheck(self, name, dir, pre, suf): 83 (ndir, nbase) = os.path.split(name) 84 npre = nbase[:len(pre)] 85 nsuf = nbase[len(nbase)-len(suf):] 86 87 if dir is not None: 88 self.assertIs(type(name), str if type(dir) is str else bytes, 89 "unexpected return type") 90 if pre is not None: 91 self.assertIs(type(name), str if type(pre) is str else bytes, 92 "unexpected return type") 93 if suf is not None: 94 self.assertIs(type(name), str if type(suf) is str else bytes, 95 "unexpected return type") 96 if (dir, pre, suf) == (None, None, None): 97 self.assertIs(type(name), str, "default return type must be str") 98 99 # check for equality of the absolute paths! 100 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 101 "file %r not in directory %r" % (name, dir)) 102 self.assertEqual(npre, pre, 103 "file %r does not begin with %r" % (nbase, pre)) 104 self.assertEqual(nsuf, suf, 105 "file %r does not end with %r" % (nbase, suf)) 106 107 nbase = nbase[len(pre):len(nbase)-len(suf)] 108 check = self.str_check if isinstance(nbase, str) else self.b_check 109 self.assertTrue(check.match(nbase), 110 "random characters %r do not match %r" 111 % (nbase, check.pattern)) 112 113 114 class TestExports(BaseTestCase): 115 def test_exports(self): 116 # There are no surprising symbols in the tempfile module 117 dict = tempfile.__dict__ 118 119 expected = { 120 "NamedTemporaryFile" : 1, 121 "TemporaryFile" : 1, 122 "mkstemp" : 1, 123 "mkdtemp" : 1, 124 "mktemp" : 1, 125 "TMP_MAX" : 1, 126 "gettempprefix" : 1, 127 "gettempprefixb" : 1, 128 "gettempdir" : 1, 129 "gettempdirb" : 1, 130 "tempdir" : 1, 131 "template" : 1, 132 "SpooledTemporaryFile" : 1, 133 "TemporaryDirectory" : 1, 134 } 135 136 unexp = [] 137 for key in dict: 138 if key[0] != '_' and key not in expected: 139 unexp.append(key) 140 self.assertTrue(len(unexp) == 0, 141 "unexpected keys: %s" % unexp) 142 143 144 class TestRandomNameSequence(BaseTestCase): 145 """Test the internal iterator object _RandomNameSequence.""" 146 147 def setUp(self): 148 self.r = tempfile._RandomNameSequence() 149 super().setUp() 150 151 def test_get_six_char_str(self): 152 # _RandomNameSequence returns a six-character string 153 s = next(self.r) 154 self.nameCheck(s, '', '', '') 155 156 def test_many(self): 157 # _RandomNameSequence returns no duplicate strings (stochastic) 158 159 dict = {} 160 r = self.r 161 for i in range(TEST_FILES): 162 s = next(r) 163 self.nameCheck(s, '', '', '') 164 self.assertNotIn(s, dict) 165 dict[s] = 1 166 167 def supports_iter(self): 168 # _RandomNameSequence supports the iterator protocol 169 170 i = 0 171 r = self.r 172 for s in r: 173 i += 1 174 if i == 20: 175 break 176 177 @unittest.skipUnless(hasattr(os, 'fork'), 178 "os.fork is required for this test") 179 def test_process_awareness(self): 180 # ensure that the random source differs between 181 # child and parent. 182 read_fd, write_fd = os.pipe() 183 pid = None 184 try: 185 pid = os.fork() 186 if not pid: 187 os.close(read_fd) 188 os.write(write_fd, next(self.r).encode("ascii")) 189 os.close(write_fd) 190 # bypass the normal exit handlers- leave those to 191 # the parent. 192 os._exit(0) 193 parent_value = next(self.r) 194 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 195 finally: 196 if pid: 197 # best effort to ensure the process can't bleed out 198 # via any bugs above 199 try: 200 os.kill(pid, signal.SIGKILL) 201 except OSError: 202 pass 203 os.close(read_fd) 204 os.close(write_fd) 205 self.assertNotEqual(child_value, parent_value) 206 207 208 209 class TestCandidateTempdirList(BaseTestCase): 210 """Test the internal function _candidate_tempdir_list.""" 211 212 def test_nonempty_list(self): 213 # _candidate_tempdir_list returns a nonempty list of strings 214 215 cand = tempfile._candidate_tempdir_list() 216 217 self.assertFalse(len(cand) == 0) 218 for c in cand: 219 self.assertIsInstance(c, str) 220 221 def test_wanted_dirs(self): 222 # _candidate_tempdir_list contains the expected directories 223 224 # Make sure the interesting environment variables are all set. 225 with support.EnvironmentVarGuard() as env: 226 for envname in 'TMPDIR', 'TEMP', 'TMP': 227 dirname = os.getenv(envname) 228 if not dirname: 229 env[envname] = os.path.abspath(envname) 230 231 cand = tempfile._candidate_tempdir_list() 232 233 for envname in 'TMPDIR', 'TEMP', 'TMP': 234 dirname = os.getenv(envname) 235 if not dirname: raise ValueError 236 self.assertIn(dirname, cand) 237 238 try: 239 dirname = os.getcwd() 240 except (AttributeError, OSError): 241 dirname = os.curdir 242 243 self.assertIn(dirname, cand) 244 245 # Not practical to try to verify the presence of OS-specific 246 # paths in this list. 247 248 249 # We test _get_default_tempdir some more by testing gettempdir. 250 251 class TestGetDefaultTempdir(BaseTestCase): 252 """Test _get_default_tempdir().""" 253 254 def test_no_files_left_behind(self): 255 # use a private empty directory 256 with tempfile.TemporaryDirectory() as our_temp_directory: 257 # force _get_default_tempdir() to consider our empty directory 258 def our_candidate_list(): 259 return [our_temp_directory] 260 261 with support.swap_attr(tempfile, "_candidate_tempdir_list", 262 our_candidate_list): 263 # verify our directory is empty after _get_default_tempdir() 264 tempfile._get_default_tempdir() 265 self.assertEqual(os.listdir(our_temp_directory), []) 266 267 def raise_OSError(*args, **kwargs): 268 raise OSError() 269 270 with support.swap_attr(io, "open", raise_OSError): 271 # test again with failing io.open() 272 with self.assertRaises(FileNotFoundError): 273 tempfile._get_default_tempdir() 274 self.assertEqual(os.listdir(our_temp_directory), []) 275 276 open = io.open 277 def bad_writer(*args, **kwargs): 278 fp = open(*args, **kwargs) 279 fp.write = raise_OSError 280 return fp 281 282 with support.swap_attr(io, "open", bad_writer): 283 # test again with failing write() 284 with self.assertRaises(FileNotFoundError): 285 tempfile._get_default_tempdir() 286 self.assertEqual(os.listdir(our_temp_directory), []) 287 288 289 class TestGetCandidateNames(BaseTestCase): 290 """Test the internal function _get_candidate_names.""" 291 292 def test_retval(self): 293 # _get_candidate_names returns a _RandomNameSequence object 294 obj = tempfile._get_candidate_names() 295 self.assertIsInstance(obj, tempfile._RandomNameSequence) 296 297 def test_same_thing(self): 298 # _get_candidate_names always returns the same object 299 a = tempfile._get_candidate_names() 300 b = tempfile._get_candidate_names() 301 302 self.assertTrue(a is b) 303 304 305 @contextlib.contextmanager 306 def _inside_empty_temp_dir(): 307 dir = tempfile.mkdtemp() 308 try: 309 with support.swap_attr(tempfile, 'tempdir', dir): 310 yield 311 finally: 312 support.rmtree(dir) 313 314 315 def _mock_candidate_names(*names): 316 return support.swap_attr(tempfile, 317 '_get_candidate_names', 318 lambda: iter(names)) 319 320 321 class TestBadTempdir: 322 323 def test_read_only_directory(self): 324 with _inside_empty_temp_dir(): 325 oldmode = mode = os.stat(tempfile.tempdir).st_mode 326 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 327 os.chmod(tempfile.tempdir, mode) 328 try: 329 if os.access(tempfile.tempdir, os.W_OK): 330 self.skipTest("can't set the directory read-only") 331 with self.assertRaises(PermissionError): 332 self.make_temp() 333 self.assertEqual(os.listdir(tempfile.tempdir), []) 334 finally: 335 os.chmod(tempfile.tempdir, oldmode) 336 337 def test_nonexisting_directory(self): 338 with _inside_empty_temp_dir(): 339 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 340 with support.swap_attr(tempfile, 'tempdir', tempdir): 341 with self.assertRaises(FileNotFoundError): 342 self.make_temp() 343 344 def test_non_directory(self): 345 with _inside_empty_temp_dir(): 346 tempdir = os.path.join(tempfile.tempdir, 'file') 347 open(tempdir, 'wb').close() 348 with support.swap_attr(tempfile, 'tempdir', tempdir): 349 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 350 self.make_temp() 351 352 353 class TestMkstempInner(TestBadTempdir, BaseTestCase): 354 """Test the internal function _mkstemp_inner.""" 355 356 class mkstemped: 357 _bflags = tempfile._bin_openflags 358 _tflags = tempfile._text_openflags 359 _close = os.close 360 _unlink = os.unlink 361 362 def __init__(self, dir, pre, suf, bin): 363 if bin: flags = self._bflags 364 else: flags = self._tflags 365 366 output_type = tempfile._infer_return_type(dir, pre, suf) 367 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 368 369 def write(self, str): 370 os.write(self.fd, str) 371 372 def __del__(self): 373 self._close(self.fd) 374 self._unlink(self.name) 375 376 def do_create(self, dir=None, pre=None, suf=None, bin=1): 377 output_type = tempfile._infer_return_type(dir, pre, suf) 378 if dir is None: 379 if output_type is str: 380 dir = tempfile.gettempdir() 381 else: 382 dir = tempfile.gettempdirb() 383 if pre is None: 384 pre = output_type() 385 if suf is None: 386 suf = output_type() 387 file = self.mkstemped(dir, pre, suf, bin) 388 389 self.nameCheck(file.name, dir, pre, suf) 390 return file 391 392 def test_basic(self): 393 # _mkstemp_inner can create files 394 self.do_create().write(b"blat") 395 self.do_create(pre="a").write(b"blat") 396 self.do_create(suf="b").write(b"blat") 397 self.do_create(pre="a", suf="b").write(b"blat") 398 self.do_create(pre="aa", suf=".txt").write(b"blat") 399 400 def test_basic_with_bytes_names(self): 401 # _mkstemp_inner can create files when given name parts all 402 # specified as bytes. 403 dir_b = tempfile.gettempdirb() 404 self.do_create(dir=dir_b, suf=b"").write(b"blat") 405 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 406 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 407 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 408 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 409 # Can't mix str & binary types in the args. 410 with self.assertRaises(TypeError): 411 self.do_create(dir="", suf=b"").write(b"blat") 412 with self.assertRaises(TypeError): 413 self.do_create(dir=dir_b, pre="").write(b"blat") 414 with self.assertRaises(TypeError): 415 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 416 417 def test_basic_many(self): 418 # _mkstemp_inner can create many files (stochastic) 419 extant = list(range(TEST_FILES)) 420 for i in extant: 421 extant[i] = self.do_create(pre="aa") 422 423 def test_choose_directory(self): 424 # _mkstemp_inner can create files in a user-selected directory 425 dir = tempfile.mkdtemp() 426 try: 427 self.do_create(dir=dir).write(b"blat") 428 finally: 429 os.rmdir(dir) 430 431 @unittest.skipUnless(has_stat, 'os.stat not available') 432 def test_file_mode(self): 433 # _mkstemp_inner creates files with the proper mode 434 435 file = self.do_create() 436 mode = stat.S_IMODE(os.stat(file.name).st_mode) 437 expected = 0o600 438 if sys.platform == 'win32': 439 # There's no distinction among 'user', 'group' and 'world'; 440 # replicate the 'user' bits. 441 user = expected >> 6 442 expected = user * (1 + 8 + 64) 443 self.assertEqual(mode, expected) 444 445 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 446 def test_noinherit(self): 447 # _mkstemp_inner file handles are not inherited by child processes 448 449 if support.verbose: 450 v="v" 451 else: 452 v="q" 453 454 file = self.do_create() 455 self.assertEqual(os.get_inheritable(file.fd), False) 456 fd = "%d" % file.fd 457 458 try: 459 me = __file__ 460 except NameError: 461 me = sys.argv[0] 462 463 # We have to exec something, so that FD_CLOEXEC will take 464 # effect. The core of this test is therefore in 465 # tf_inherit_check.py, which see. 466 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 467 "tf_inherit_check.py") 468 469 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 470 # but an arg with embedded spaces should be decorated with double 471 # quotes on each end 472 if sys.platform == 'win32': 473 decorated = '"%s"' % sys.executable 474 tester = '"%s"' % tester 475 else: 476 decorated = sys.executable 477 478 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 479 self.assertFalse(retval < 0, 480 "child process caught fatal signal %d" % -retval) 481 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 482 483 @unittest.skipUnless(has_textmode, "text mode not available") 484 def test_textmode(self): 485 # _mkstemp_inner can create files in text mode 486 487 # A text file is truncated at the first Ctrl+Z byte 488 f = self.do_create(bin=0) 489 f.write(b"blat\x1a") 490 f.write(b"extra\n") 491 os.lseek(f.fd, 0, os.SEEK_SET) 492 self.assertEqual(os.read(f.fd, 20), b"blat") 493 494 def make_temp(self): 495 return tempfile._mkstemp_inner(tempfile.gettempdir(), 496 tempfile.gettempprefix(), 497 '', 498 tempfile._bin_openflags, 499 str) 500 501 def test_collision_with_existing_file(self): 502 # _mkstemp_inner tries another name when a file with 503 # the chosen name already exists 504 with _inside_empty_temp_dir(), \ 505 _mock_candidate_names('aaa', 'aaa', 'bbb'): 506 (fd1, name1) = self.make_temp() 507 os.close(fd1) 508 self.assertTrue(name1.endswith('aaa')) 509 510 (fd2, name2) = self.make_temp() 511 os.close(fd2) 512 self.assertTrue(name2.endswith('bbb')) 513 514 def test_collision_with_existing_directory(self): 515 # _mkstemp_inner tries another name when a directory with 516 # the chosen name already exists 517 with _inside_empty_temp_dir(), \ 518 _mock_candidate_names('aaa', 'aaa', 'bbb'): 519 dir = tempfile.mkdtemp() 520 self.assertTrue(dir.endswith('aaa')) 521 522 (fd, name) = self.make_temp() 523 os.close(fd) 524 self.assertTrue(name.endswith('bbb')) 525 526 527 class TestGetTempPrefix(BaseTestCase): 528 """Test gettempprefix().""" 529 530 def test_sane_template(self): 531 # gettempprefix returns a nonempty prefix string 532 p = tempfile.gettempprefix() 533 534 self.assertIsInstance(p, str) 535 self.assertGreater(len(p), 0) 536 537 pb = tempfile.gettempprefixb() 538 539 self.assertIsInstance(pb, bytes) 540 self.assertGreater(len(pb), 0) 541 542 def test_usable_template(self): 543 # gettempprefix returns a usable prefix string 544 545 # Create a temp directory, avoiding use of the prefix. 546 # Then attempt to create a file whose name is 547 # prefix + 'xxxxxx.xxx' in that directory. 548 p = tempfile.gettempprefix() + "xxxxxx.xxx" 549 d = tempfile.mkdtemp(prefix="") 550 try: 551 p = os.path.join(d, p) 552 fd = os.open(p, os.O_RDWR | os.O_CREAT) 553 os.close(fd) 554 os.unlink(p) 555 finally: 556 os.rmdir(d) 557 558 559 class TestGetTempDir(BaseTestCase): 560 """Test gettempdir().""" 561 562 def test_directory_exists(self): 563 # gettempdir returns a directory which exists 564 565 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 566 self.assertTrue(os.path.isabs(d) or d == os.curdir, 567 "%r is not an absolute path" % d) 568 self.assertTrue(os.path.isdir(d), 569 "%r is not a directory" % d) 570 571 def test_directory_writable(self): 572 # gettempdir returns a directory writable by the user 573 574 # sneaky: just instantiate a NamedTemporaryFile, which 575 # defaults to writing into the directory returned by 576 # gettempdir. 577 file = tempfile.NamedTemporaryFile() 578 file.write(b"blat") 579 file.close() 580 581 def test_same_thing(self): 582 # gettempdir always returns the same object 583 a = tempfile.gettempdir() 584 b = tempfile.gettempdir() 585 c = tempfile.gettempdirb() 586 587 self.assertTrue(a is b) 588 self.assertNotEqual(type(a), type(c)) 589 self.assertEqual(a, os.fsdecode(c)) 590 591 def test_case_sensitive(self): 592 # gettempdir should not flatten its case 593 # even on a case-insensitive file system 594 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 595 _tempdir, tempfile.tempdir = tempfile.tempdir, None 596 try: 597 with support.EnvironmentVarGuard() as env: 598 # Fake the first env var which is checked as a candidate 599 env["TMPDIR"] = case_sensitive_tempdir 600 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 601 finally: 602 tempfile.tempdir = _tempdir 603 support.rmdir(case_sensitive_tempdir) 604 605 606 class TestMkstemp(BaseTestCase): 607 """Test mkstemp().""" 608 609 def do_create(self, dir=None, pre=None, suf=None): 610 output_type = tempfile._infer_return_type(dir, pre, suf) 611 if dir is None: 612 if output_type is str: 613 dir = tempfile.gettempdir() 614 else: 615 dir = tempfile.gettempdirb() 616 if pre is None: 617 pre = output_type() 618 if suf is None: 619 suf = output_type() 620 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 621 (ndir, nbase) = os.path.split(name) 622 adir = os.path.abspath(dir) 623 self.assertEqual(adir, ndir, 624 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 625 626 try: 627 self.nameCheck(name, dir, pre, suf) 628 finally: 629 os.close(fd) 630 os.unlink(name) 631 632 def test_basic(self): 633 # mkstemp can create files 634 self.do_create() 635 self.do_create(pre="a") 636 self.do_create(suf="b") 637 self.do_create(pre="a", suf="b") 638 self.do_create(pre="aa", suf=".txt") 639 self.do_create(dir=".") 640 641 def test_basic_with_bytes_names(self): 642 # mkstemp can create files when given name parts all 643 # specified as bytes. 644 d = tempfile.gettempdirb() 645 self.do_create(dir=d, suf=b"") 646 self.do_create(dir=d, pre=b"a") 647 self.do_create(dir=d, suf=b"b") 648 self.do_create(dir=d, pre=b"a", suf=b"b") 649 self.do_create(dir=d, pre=b"aa", suf=b".txt") 650 self.do_create(dir=b".") 651 with self.assertRaises(TypeError): 652 self.do_create(dir=".", pre=b"aa", suf=b".txt") 653 with self.assertRaises(TypeError): 654 self.do_create(dir=b".", pre="aa", suf=b".txt") 655 with self.assertRaises(TypeError): 656 self.do_create(dir=b".", pre=b"aa", suf=".txt") 657 658 659 def test_choose_directory(self): 660 # mkstemp can create directories in a user-selected directory 661 dir = tempfile.mkdtemp() 662 try: 663 self.do_create(dir=dir) 664 finally: 665 os.rmdir(dir) 666 667 668 class TestMkdtemp(TestBadTempdir, BaseTestCase): 669 """Test mkdtemp().""" 670 671 def make_temp(self): 672 return tempfile.mkdtemp() 673 674 def do_create(self, dir=None, pre=None, suf=None): 675 output_type = tempfile._infer_return_type(dir, pre, suf) 676 if dir is None: 677 if output_type is str: 678 dir = tempfile.gettempdir() 679 else: 680 dir = tempfile.gettempdirb() 681 if pre is None: 682 pre = output_type() 683 if suf is None: 684 suf = output_type() 685 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 686 687 try: 688 self.nameCheck(name, dir, pre, suf) 689 return name 690 except: 691 os.rmdir(name) 692 raise 693 694 def test_basic(self): 695 # mkdtemp can create directories 696 os.rmdir(self.do_create()) 697 os.rmdir(self.do_create(pre="a")) 698 os.rmdir(self.do_create(suf="b")) 699 os.rmdir(self.do_create(pre="a", suf="b")) 700 os.rmdir(self.do_create(pre="aa", suf=".txt")) 701 702 def test_basic_with_bytes_names(self): 703 # mkdtemp can create directories when given all binary parts 704 d = tempfile.gettempdirb() 705 os.rmdir(self.do_create(dir=d)) 706 os.rmdir(self.do_create(dir=d, pre=b"a")) 707 os.rmdir(self.do_create(dir=d, suf=b"b")) 708 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 709 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 710 with self.assertRaises(TypeError): 711 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 712 with self.assertRaises(TypeError): 713 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 714 with self.assertRaises(TypeError): 715 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 716 717 def test_basic_many(self): 718 # mkdtemp can create many directories (stochastic) 719 extant = list(range(TEST_FILES)) 720 try: 721 for i in extant: 722 extant[i] = self.do_create(pre="aa") 723 finally: 724 for i in extant: 725 if(isinstance(i, str)): 726 os.rmdir(i) 727 728 def test_choose_directory(self): 729 # mkdtemp can create directories in a user-selected directory 730 dir = tempfile.mkdtemp() 731 try: 732 os.rmdir(self.do_create(dir=dir)) 733 finally: 734 os.rmdir(dir) 735 736 @unittest.skipUnless(has_stat, 'os.stat not available') 737 def test_mode(self): 738 # mkdtemp creates directories with the proper mode 739 740 dir = self.do_create() 741 try: 742 mode = stat.S_IMODE(os.stat(dir).st_mode) 743 mode &= 0o777 # Mask off sticky bits inherited from /tmp 744 expected = 0o700 745 if sys.platform == 'win32': 746 # There's no distinction among 'user', 'group' and 'world'; 747 # replicate the 'user' bits. 748 user = expected >> 6 749 expected = user * (1 + 8 + 64) 750 self.assertEqual(mode, expected) 751 finally: 752 os.rmdir(dir) 753 754 def test_collision_with_existing_file(self): 755 # mkdtemp tries another name when a file with 756 # the chosen name already exists 757 with _inside_empty_temp_dir(), \ 758 _mock_candidate_names('aaa', 'aaa', 'bbb'): 759 file = tempfile.NamedTemporaryFile(delete=False) 760 file.close() 761 self.assertTrue(file.name.endswith('aaa')) 762 dir = tempfile.mkdtemp() 763 self.assertTrue(dir.endswith('bbb')) 764 765 def test_collision_with_existing_directory(self): 766 # mkdtemp tries another name when a directory with 767 # the chosen name already exists 768 with _inside_empty_temp_dir(), \ 769 _mock_candidate_names('aaa', 'aaa', 'bbb'): 770 dir1 = tempfile.mkdtemp() 771 self.assertTrue(dir1.endswith('aaa')) 772 dir2 = tempfile.mkdtemp() 773 self.assertTrue(dir2.endswith('bbb')) 774 775 776 class TestMktemp(BaseTestCase): 777 """Test mktemp().""" 778 779 # For safety, all use of mktemp must occur in a private directory. 780 # We must also suppress the RuntimeWarning it generates. 781 def setUp(self): 782 self.dir = tempfile.mkdtemp() 783 super().setUp() 784 785 def tearDown(self): 786 if self.dir: 787 os.rmdir(self.dir) 788 self.dir = None 789 super().tearDown() 790 791 class mktemped: 792 _unlink = os.unlink 793 _bflags = tempfile._bin_openflags 794 795 def __init__(self, dir, pre, suf): 796 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 797 # Create the file. This will raise an exception if it's 798 # mysteriously appeared in the meanwhile. 799 os.close(os.open(self.name, self._bflags, 0o600)) 800 801 def __del__(self): 802 self._unlink(self.name) 803 804 def do_create(self, pre="", suf=""): 805 file = self.mktemped(self.dir, pre, suf) 806 807 self.nameCheck(file.name, self.dir, pre, suf) 808 return file 809 810 def test_basic(self): 811 # mktemp can choose usable file names 812 self.do_create() 813 self.do_create(pre="a") 814 self.do_create(suf="b") 815 self.do_create(pre="a", suf="b") 816 self.do_create(pre="aa", suf=".txt") 817 818 def test_many(self): 819 # mktemp can choose many usable file names (stochastic) 820 extant = list(range(TEST_FILES)) 821 for i in extant: 822 extant[i] = self.do_create(pre="aa") 823 824 ## def test_warning(self): 825 ## # mktemp issues a warning when used 826 ## warnings.filterwarnings("error", 827 ## category=RuntimeWarning, 828 ## message="mktemp") 829 ## self.assertRaises(RuntimeWarning, 830 ## tempfile.mktemp, dir=self.dir) 831 832 833 # We test _TemporaryFileWrapper by testing NamedTemporaryFile. 834 835 836 class TestNamedTemporaryFile(BaseTestCase): 837 """Test NamedTemporaryFile().""" 838 839 def do_create(self, dir=None, pre="", suf="", delete=True): 840 if dir is None: 841 dir = tempfile.gettempdir() 842 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 843 delete=delete) 844 845 self.nameCheck(file.name, dir, pre, suf) 846 return file 847 848 849 def test_basic(self): 850 # NamedTemporaryFile can create files 851 self.do_create() 852 self.do_create(pre="a") 853 self.do_create(suf="b") 854 self.do_create(pre="a", suf="b") 855 self.do_create(pre="aa", suf=".txt") 856 857 def test_method_lookup(self): 858 # Issue #18879: Looking up a temporary file method should keep it 859 # alive long enough. 860 f = self.do_create() 861 wr = weakref.ref(f) 862 write = f.write 863 write2 = f.write 864 del f 865 write(b'foo') 866 del write 867 write2(b'bar') 868 del write2 869 if support.check_impl_detail(cpython=True): 870 # No reference cycle was created. 871 self.assertIsNone(wr()) 872 873 def test_iter(self): 874 # Issue #23700: getting iterator from a temporary file should keep 875 # it alive as long as it's being iterated over 876 lines = [b'spam\n', b'eggs\n', b'beans\n'] 877 def make_file(): 878 f = tempfile.NamedTemporaryFile(mode='w+b') 879 f.write(b''.join(lines)) 880 f.seek(0) 881 return f 882 for i, l in enumerate(make_file()): 883 self.assertEqual(l, lines[i]) 884 self.assertEqual(i, len(lines) - 1) 885 886 def test_creates_named(self): 887 # NamedTemporaryFile creates files with names 888 f = tempfile.NamedTemporaryFile() 889 self.assertTrue(os.path.exists(f.name), 890 "NamedTemporaryFile %s does not exist" % f.name) 891 892 def test_del_on_close(self): 893 # A NamedTemporaryFile is deleted when closed 894 dir = tempfile.mkdtemp() 895 try: 896 f = tempfile.NamedTemporaryFile(dir=dir) 897 f.write(b'blat') 898 f.close() 899 self.assertFalse(os.path.exists(f.name), 900 "NamedTemporaryFile %s exists after close" % f.name) 901 finally: 902 os.rmdir(dir) 903 904 def test_dis_del_on_close(self): 905 # Tests that delete-on-close can be disabled 906 dir = tempfile.mkdtemp() 907 tmp = None 908 try: 909 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 910 tmp = f.name 911 f.write(b'blat') 912 f.close() 913 self.assertTrue(os.path.exists(f.name), 914 "NamedTemporaryFile %s missing after close" % f.name) 915 finally: 916 if tmp is not None: 917 os.unlink(tmp) 918 os.rmdir(dir) 919 920 def test_multiple_close(self): 921 # A NamedTemporaryFile can be closed many times without error 922 f = tempfile.NamedTemporaryFile() 923 f.write(b'abc\n') 924 f.close() 925 f.close() 926 f.close() 927 928 def test_context_manager(self): 929 # A NamedTemporaryFile can be used as a context manager 930 with tempfile.NamedTemporaryFile() as f: 931 self.assertTrue(os.path.exists(f.name)) 932 self.assertFalse(os.path.exists(f.name)) 933 def use_closed(): 934 with f: 935 pass 936 self.assertRaises(ValueError, use_closed) 937 938 def test_no_leak_fd(self): 939 # Issue #21058: don't leak file descriptor when io.open() fails 940 closed = [] 941 os_close = os.close 942 def close(fd): 943 closed.append(fd) 944 os_close(fd) 945 946 with mock.patch('os.close', side_effect=close): 947 with mock.patch('io.open', side_effect=ValueError): 948 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 949 self.assertEqual(len(closed), 1) 950 951 def test_bad_mode(self): 952 dir = tempfile.mkdtemp() 953 self.addCleanup(support.rmtree, dir) 954 with self.assertRaises(ValueError): 955 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 956 with self.assertRaises(TypeError): 957 tempfile.NamedTemporaryFile(mode=2, dir=dir) 958 self.assertEqual(os.listdir(dir), []) 959 960 # How to test the mode and bufsize parameters? 961 962 class TestSpooledTemporaryFile(BaseTestCase): 963 """Test SpooledTemporaryFile().""" 964 965 def do_create(self, max_size=0, dir=None, pre="", suf=""): 966 if dir is None: 967 dir = tempfile.gettempdir() 968 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 969 970 return file 971 972 973 def test_basic(self): 974 # SpooledTemporaryFile can create files 975 f = self.do_create() 976 self.assertFalse(f._rolled) 977 f = self.do_create(max_size=100, pre="a", suf=".txt") 978 self.assertFalse(f._rolled) 979 980 def test_del_on_close(self): 981 # A SpooledTemporaryFile is deleted when closed 982 dir = tempfile.mkdtemp() 983 try: 984 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 985 self.assertFalse(f._rolled) 986 f.write(b'blat ' * 5) 987 self.assertTrue(f._rolled) 988 filename = f.name 989 f.close() 990 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 991 "SpooledTemporaryFile %s exists after close" % filename) 992 finally: 993 os.rmdir(dir) 994 995 def test_rewrite_small(self): 996 # A SpooledTemporaryFile can be written to multiple within the max_size 997 f = self.do_create(max_size=30) 998 self.assertFalse(f._rolled) 999 for i in range(5): 1000 f.seek(0, 0) 1001 f.write(b'x' * 20) 1002 self.assertFalse(f._rolled) 1003 1004 def test_write_sequential(self): 1005 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1006 # over afterward 1007 f = self.do_create(max_size=30) 1008 self.assertFalse(f._rolled) 1009 f.write(b'x' * 20) 1010 self.assertFalse(f._rolled) 1011 f.write(b'x' * 10) 1012 self.assertFalse(f._rolled) 1013 f.write(b'x') 1014 self.assertTrue(f._rolled) 1015 1016 def test_writelines(self): 1017 # Verify writelines with a SpooledTemporaryFile 1018 f = self.do_create() 1019 f.writelines((b'x', b'y', b'z')) 1020 f.seek(0) 1021 buf = f.read() 1022 self.assertEqual(buf, b'xyz') 1023 1024 def test_writelines_sequential(self): 1025 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1026 # over afterward 1027 f = self.do_create(max_size=35) 1028 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1029 self.assertFalse(f._rolled) 1030 f.write(b'x') 1031 self.assertTrue(f._rolled) 1032 1033 def test_sparse(self): 1034 # A SpooledTemporaryFile that is written late in the file will extend 1035 # when that occurs 1036 f = self.do_create(max_size=30) 1037 self.assertFalse(f._rolled) 1038 f.seek(100, 0) 1039 self.assertFalse(f._rolled) 1040 f.write(b'x') 1041 self.assertTrue(f._rolled) 1042 1043 def test_fileno(self): 1044 # A SpooledTemporaryFile should roll over to a real file on fileno() 1045 f = self.do_create(max_size=30) 1046 self.assertFalse(f._rolled) 1047 self.assertTrue(f.fileno() > 0) 1048 self.assertTrue(f._rolled) 1049 1050 def test_multiple_close_before_rollover(self): 1051 # A SpooledTemporaryFile can be closed many times without error 1052 f = tempfile.SpooledTemporaryFile() 1053 f.write(b'abc\n') 1054 self.assertFalse(f._rolled) 1055 f.close() 1056 f.close() 1057 f.close() 1058 1059 def test_multiple_close_after_rollover(self): 1060 # A SpooledTemporaryFile can be closed many times without error 1061 f = tempfile.SpooledTemporaryFile(max_size=1) 1062 f.write(b'abc\n') 1063 self.assertTrue(f._rolled) 1064 f.close() 1065 f.close() 1066 f.close() 1067 1068 def test_bound_methods(self): 1069 # It should be OK to steal a bound method from a SpooledTemporaryFile 1070 # and use it independently; when the file rolls over, those bound 1071 # methods should continue to function 1072 f = self.do_create(max_size=30) 1073 read = f.read 1074 write = f.write 1075 seek = f.seek 1076 1077 write(b"a" * 35) 1078 write(b"b" * 35) 1079 seek(0, 0) 1080 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1081 1082 def test_properties(self): 1083 f = tempfile.SpooledTemporaryFile(max_size=10) 1084 f.write(b'x' * 10) 1085 self.assertFalse(f._rolled) 1086 self.assertEqual(f.mode, 'w+b') 1087 self.assertIsNone(f.name) 1088 with self.assertRaises(AttributeError): 1089 f.newlines 1090 with self.assertRaises(AttributeError): 1091 f.encoding 1092 1093 f.write(b'x') 1094 self.assertTrue(f._rolled) 1095 self.assertEqual(f.mode, 'rb+') 1096 self.assertIsNotNone(f.name) 1097 with self.assertRaises(AttributeError): 1098 f.newlines 1099 with self.assertRaises(AttributeError): 1100 f.encoding 1101 1102 def test_text_mode(self): 1103 # Creating a SpooledTemporaryFile with a text mode should produce 1104 # a file object reading and writing (Unicode) text strings. 1105 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) 1106 f.write("abc\n") 1107 f.seek(0) 1108 self.assertEqual(f.read(), "abc\n") 1109 f.write("def\n") 1110 f.seek(0) 1111 self.assertEqual(f.read(), "abc\ndef\n") 1112 self.assertFalse(f._rolled) 1113 self.assertEqual(f.mode, 'w+') 1114 self.assertIsNone(f.name) 1115 self.assertIsNone(f.newlines) 1116 self.assertIsNone(f.encoding) 1117 1118 f.write("xyzzy\n") 1119 f.seek(0) 1120 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1121 # Check that Ctrl+Z doesn't truncate the file 1122 f.write("foo\x1abar\n") 1123 f.seek(0) 1124 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1125 self.assertTrue(f._rolled) 1126 self.assertEqual(f.mode, 'w+') 1127 self.assertIsNotNone(f.name) 1128 self.assertEqual(f.newlines, os.linesep) 1129 self.assertIsNotNone(f.encoding) 1130 1131 def test_text_newline_and_encoding(self): 1132 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1133 newline='', encoding='utf-8') 1134 f.write("\u039B\r\n") 1135 f.seek(0) 1136 self.assertEqual(f.read(), "\u039B\r\n") 1137 self.assertFalse(f._rolled) 1138 self.assertEqual(f.mode, 'w+') 1139 self.assertIsNone(f.name) 1140 self.assertIsNone(f.newlines) 1141 self.assertIsNone(f.encoding) 1142 1143 f.write("\u039B" * 20 + "\r\n") 1144 f.seek(0) 1145 self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") 1146 self.assertTrue(f._rolled) 1147 self.assertEqual(f.mode, 'w+') 1148 self.assertIsNotNone(f.name) 1149 self.assertIsNotNone(f.newlines) 1150 self.assertEqual(f.encoding, 'utf-8') 1151 1152 def test_context_manager_before_rollover(self): 1153 # A SpooledTemporaryFile can be used as a context manager 1154 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1155 self.assertFalse(f._rolled) 1156 self.assertFalse(f.closed) 1157 self.assertTrue(f.closed) 1158 def use_closed(): 1159 with f: 1160 pass 1161 self.assertRaises(ValueError, use_closed) 1162 1163 def test_context_manager_during_rollover(self): 1164 # A SpooledTemporaryFile can be used as a context manager 1165 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1166 self.assertFalse(f._rolled) 1167 f.write(b'abc\n') 1168 f.flush() 1169 self.assertTrue(f._rolled) 1170 self.assertFalse(f.closed) 1171 self.assertTrue(f.closed) 1172 def use_closed(): 1173 with f: 1174 pass 1175 self.assertRaises(ValueError, use_closed) 1176 1177 def test_context_manager_after_rollover(self): 1178 # A SpooledTemporaryFile can be used as a context manager 1179 f = tempfile.SpooledTemporaryFile(max_size=1) 1180 f.write(b'abc\n') 1181 f.flush() 1182 self.assertTrue(f._rolled) 1183 with f: 1184 self.assertFalse(f.closed) 1185 self.assertTrue(f.closed) 1186 def use_closed(): 1187 with f: 1188 pass 1189 self.assertRaises(ValueError, use_closed) 1190 1191 def test_truncate_with_size_parameter(self): 1192 # A SpooledTemporaryFile can be truncated to zero size 1193 f = tempfile.SpooledTemporaryFile(max_size=10) 1194 f.write(b'abcdefg\n') 1195 f.seek(0) 1196 f.truncate() 1197 self.assertFalse(f._rolled) 1198 self.assertEqual(f._file.getvalue(), b'') 1199 # A SpooledTemporaryFile can be truncated to a specific size 1200 f = tempfile.SpooledTemporaryFile(max_size=10) 1201 f.write(b'abcdefg\n') 1202 f.truncate(4) 1203 self.assertFalse(f._rolled) 1204 self.assertEqual(f._file.getvalue(), b'abcd') 1205 # A SpooledTemporaryFile rolls over if truncated to large size 1206 f = tempfile.SpooledTemporaryFile(max_size=10) 1207 f.write(b'abcdefg\n') 1208 f.truncate(20) 1209 self.assertTrue(f._rolled) 1210 if has_stat: 1211 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1212 1213 1214 if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1215 1216 class TestTemporaryFile(BaseTestCase): 1217 """Test TemporaryFile().""" 1218 1219 def test_basic(self): 1220 # TemporaryFile can create files 1221 # No point in testing the name params - the file has no name. 1222 tempfile.TemporaryFile() 1223 1224 def test_has_no_name(self): 1225 # TemporaryFile creates files with no names (on this system) 1226 dir = tempfile.mkdtemp() 1227 f = tempfile.TemporaryFile(dir=dir) 1228 f.write(b'blat') 1229 1230 # Sneaky: because this file has no name, it should not prevent 1231 # us from removing the directory it was created in. 1232 try: 1233 os.rmdir(dir) 1234 except: 1235 # cleanup 1236 f.close() 1237 os.rmdir(dir) 1238 raise 1239 1240 def test_multiple_close(self): 1241 # A TemporaryFile can be closed many times without error 1242 f = tempfile.TemporaryFile() 1243 f.write(b'abc\n') 1244 f.close() 1245 f.close() 1246 f.close() 1247 1248 # How to test the mode and bufsize parameters? 1249 def test_mode_and_encoding(self): 1250 1251 def roundtrip(input, *args, **kwargs): 1252 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1253 fileobj.write(input) 1254 fileobj.seek(0) 1255 self.assertEqual(input, fileobj.read()) 1256 1257 roundtrip(b"1234", "w+b") 1258 roundtrip("abdc\n", "w+") 1259 roundtrip("\u039B", "w+", encoding="utf-16") 1260 roundtrip("foo\r\n", "w+", newline="") 1261 1262 def test_no_leak_fd(self): 1263 # Issue #21058: don't leak file descriptor when io.open() fails 1264 closed = [] 1265 os_close = os.close 1266 def close(fd): 1267 closed.append(fd) 1268 os_close(fd) 1269 1270 with mock.patch('os.close', side_effect=close): 1271 with mock.patch('io.open', side_effect=ValueError): 1272 self.assertRaises(ValueError, tempfile.TemporaryFile) 1273 self.assertEqual(len(closed), 1) 1274 1275 1276 1277 # Helper for test_del_on_shutdown 1278 class NulledModules: 1279 def __init__(self, *modules): 1280 self.refs = [mod.__dict__ for mod in modules] 1281 self.contents = [ref.copy() for ref in self.refs] 1282 1283 def __enter__(self): 1284 for d in self.refs: 1285 for key in d: 1286 d[key] = None 1287 1288 def __exit__(self, *exc_info): 1289 for d, c in zip(self.refs, self.contents): 1290 d.clear() 1291 d.update(c) 1292 1293 class TestTemporaryDirectory(BaseTestCase): 1294 """Test TemporaryDirectory().""" 1295 1296 def do_create(self, dir=None, pre="", suf="", recurse=1): 1297 if dir is None: 1298 dir = tempfile.gettempdir() 1299 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1300 self.nameCheck(tmp.name, dir, pre, suf) 1301 # Create a subdirectory and some files 1302 if recurse: 1303 d1 = self.do_create(tmp.name, pre, suf, recurse-1) 1304 d1.name = None 1305 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1306 f.write(b"Hello world!") 1307 return tmp 1308 1309 def test_mkdtemp_failure(self): 1310 # Check no additional exception if mkdtemp fails 1311 # Previously would raise AttributeError instead 1312 # (noted as part of Issue #10188) 1313 with tempfile.TemporaryDirectory() as nonexistent: 1314 pass 1315 with self.assertRaises(FileNotFoundError) as cm: 1316 tempfile.TemporaryDirectory(dir=nonexistent) 1317 self.assertEqual(cm.exception.errno, errno.ENOENT) 1318 1319 def test_explicit_cleanup(self): 1320 # A TemporaryDirectory is deleted when cleaned up 1321 dir = tempfile.mkdtemp() 1322 try: 1323 d = self.do_create(dir=dir) 1324 self.assertTrue(os.path.exists(d.name), 1325 "TemporaryDirectory %s does not exist" % d.name) 1326 d.cleanup() 1327 self.assertFalse(os.path.exists(d.name), 1328 "TemporaryDirectory %s exists after cleanup" % d.name) 1329 finally: 1330 os.rmdir(dir) 1331 1332 @support.skip_unless_symlink 1333 def test_cleanup_with_symlink_to_a_directory(self): 1334 # cleanup() should not follow symlinks to directories (issue #12464) 1335 d1 = self.do_create() 1336 d2 = self.do_create(recurse=0) 1337 1338 # Symlink d1/foo -> d2 1339 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1340 1341 # This call to cleanup() should not follow the "foo" symlink 1342 d1.cleanup() 1343 1344 self.assertFalse(os.path.exists(d1.name), 1345 "TemporaryDirectory %s exists after cleanup" % d1.name) 1346 self.assertTrue(os.path.exists(d2.name), 1347 "Directory pointed to by a symlink was deleted") 1348 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1349 "Contents of the directory pointed to by a symlink " 1350 "were deleted") 1351 d2.cleanup() 1352 1353 @support.cpython_only 1354 def test_del_on_collection(self): 1355 # A TemporaryDirectory is deleted when garbage collected 1356 dir = tempfile.mkdtemp() 1357 try: 1358 d = self.do_create(dir=dir) 1359 name = d.name 1360 del d # Rely on refcounting to invoke __del__ 1361 self.assertFalse(os.path.exists(name), 1362 "TemporaryDirectory %s exists after __del__" % name) 1363 finally: 1364 os.rmdir(dir) 1365 1366 def test_del_on_shutdown(self): 1367 # A TemporaryDirectory may be cleaned up during shutdown 1368 with self.do_create() as dir: 1369 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1370 code = """if True: 1371 import builtins 1372 import os 1373 import shutil 1374 import sys 1375 import tempfile 1376 import warnings 1377 1378 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1379 sys.stdout.buffer.write(tmp.name.encode()) 1380 1381 tmp2 = os.path.join(tmp.name, 'test_dir') 1382 os.mkdir(tmp2) 1383 with open(os.path.join(tmp2, "test.txt"), "w") as f: 1384 f.write("Hello world!") 1385 1386 {mod}.tmp = tmp 1387 1388 warnings.filterwarnings("always", category=ResourceWarning) 1389 """.format(dir=dir, mod=mod) 1390 rc, out, err = script_helper.assert_python_ok("-c", code) 1391 tmp_name = out.decode().strip() 1392 self.assertFalse(os.path.exists(tmp_name), 1393 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1394 err = err.decode('utf-8', 'backslashreplace') 1395 self.assertNotIn("Exception ", err) 1396 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1397 1398 def test_exit_on_shutdown(self): 1399 # Issue #22427 1400 with self.do_create() as dir: 1401 code = """if True: 1402 import sys 1403 import tempfile 1404 import warnings 1405 1406 def generator(): 1407 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1408 yield tmp 1409 g = generator() 1410 sys.stdout.buffer.write(next(g).encode()) 1411 1412 warnings.filterwarnings("always", category=ResourceWarning) 1413 """.format(dir=dir) 1414 rc, out, err = script_helper.assert_python_ok("-c", code) 1415 tmp_name = out.decode().strip() 1416 self.assertFalse(os.path.exists(tmp_name), 1417 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1418 err = err.decode('utf-8', 'backslashreplace') 1419 self.assertNotIn("Exception ", err) 1420 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1421 1422 def test_warnings_on_cleanup(self): 1423 # ResourceWarning will be triggered by __del__ 1424 with self.do_create() as dir: 1425 d = self.do_create(dir=dir, recurse=3) 1426 name = d.name 1427 1428 # Check for the resource warning 1429 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1430 warnings.filterwarnings("always", category=ResourceWarning) 1431 del d 1432 support.gc_collect() 1433 self.assertFalse(os.path.exists(name), 1434 "TemporaryDirectory %s exists after __del__" % name) 1435 1436 def test_multiple_close(self): 1437 # Can be cleaned-up many times without error 1438 d = self.do_create() 1439 d.cleanup() 1440 d.cleanup() 1441 d.cleanup() 1442 1443 def test_context_manager(self): 1444 # Can be used as a context manager 1445 d = self.do_create() 1446 with d as name: 1447 self.assertTrue(os.path.exists(name)) 1448 self.assertEqual(name, d.name) 1449 self.assertFalse(os.path.exists(name)) 1450 1451 1452 if __name__ == "__main__": 1453 unittest.main() 1454