1 # tempfile.py unit tests. 2 import tempfile 3 import errno 4 import io 5 import os 6 import signal 7 import shutil 8 import sys 9 import re 10 import warnings 11 import contextlib 12 13 import unittest 14 from test import test_support as support 15 16 warnings.filterwarnings("ignore", 17 category=RuntimeWarning, 18 message="mktemp", module=__name__) 19 20 if hasattr(os, 'stat'): 21 import stat 22 has_stat = 1 23 else: 24 has_stat = 0 25 26 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 27 has_spawnl = hasattr(os, 'spawnl') 28 29 # TEST_FILES may need to be tweaked for systems depending on the maximum 30 # number of files that can be opened at one time (see ulimit -n) 31 if sys.platform in ('openbsd3', 'openbsd4'): 32 TEST_FILES = 48 33 else: 34 TEST_FILES = 100 35 36 # This is organized as one test for each chunk of code in tempfile.py, 37 # in order of their appearance in the file. Testing which requires 38 # threads is not done here. 39 40 # Common functionality. 41 class TC(unittest.TestCase): 42 43 str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") 44 45 def failOnException(self, what, ei=None): 46 if ei is None: 47 ei = sys.exc_info() 48 self.fail("%s raised %s: %s" % (what, ei[0], ei[1])) 49 50 def nameCheck(self, name, dir, pre, suf): 51 (ndir, nbase) = os.path.split(name) 52 npre = nbase[:len(pre)] 53 nsuf = nbase[len(nbase)-len(suf):] 54 55 # check for equality of the absolute paths! 56 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 57 "file '%s' not in directory '%s'" % (name, dir)) 58 self.assertEqual(npre, pre, 59 "file '%s' does not begin with '%s'" % (nbase, pre)) 60 self.assertEqual(nsuf, suf, 61 "file '%s' does not end with '%s'" % (nbase, suf)) 62 63 nbase = nbase[len(pre):len(nbase)-len(suf)] 64 self.assertTrue(self.str_check.match(nbase), 65 "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/" 66 % nbase) 67 68 test_classes = [] 69 70 class test_exports(TC): 71 def test_exports(self): 72 # There are no surprising symbols in the tempfile module 73 dict = tempfile.__dict__ 74 75 expected = { 76 "NamedTemporaryFile" : 1, 77 "TemporaryFile" : 1, 78 "mkstemp" : 1, 79 "mkdtemp" : 1, 80 "mktemp" : 1, 81 "TMP_MAX" : 1, 82 "gettempprefix" : 1, 83 "gettempdir" : 1, 84 "tempdir" : 1, 85 "template" : 1, 86 "SpooledTemporaryFile" : 1 87 } 88 89 unexp = [] 90 for key in dict: 91 if key[0] != '_' and key not in expected: 92 unexp.append(key) 93 self.assertTrue(len(unexp) == 0, 94 "unexpected keys: %s" % unexp) 95 96 test_classes.append(test_exports) 97 98 99 class test__RandomNameSequence(TC): 100 """Test the internal iterator object _RandomNameSequence.""" 101 102 def setUp(self): 103 self.r = tempfile._RandomNameSequence() 104 105 def test_get_six_char_str(self): 106 # _RandomNameSequence returns a six-character string 107 s = self.r.next() 108 self.nameCheck(s, '', '', '') 109 110 def test_many(self): 111 # _RandomNameSequence returns no duplicate strings (stochastic) 112 113 dict = {} 114 r = self.r 115 for i in xrange(TEST_FILES): 116 s = r.next() 117 self.nameCheck(s, '', '', '') 118 self.assertNotIn(s, dict) 119 dict[s] = 1 120 121 def test_supports_iter(self): 122 # _RandomNameSequence supports the iterator protocol 123 124 i = 0 125 r = self.r 126 try: 127 for s in r: 128 i += 1 129 if i == 20: 130 break 131 except: 132 self.failOnException("iteration") 133 134 @unittest.skipUnless(hasattr(os, 'fork'), 135 "os.fork is required for this test") 136 def test_process_awareness(self): 137 # ensure that the random source differs between 138 # child and parent. 139 read_fd, write_fd = os.pipe() 140 pid = None 141 try: 142 pid = os.fork() 143 if not pid: 144 os.close(read_fd) 145 os.write(write_fd, next(self.r).encode("ascii")) 146 os.close(write_fd) 147 # bypass the normal exit handlers- leave those to 148 # the parent. 149 os._exit(0) 150 parent_value = next(self.r) 151 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 152 finally: 153 if pid: 154 # best effort to ensure the process can't bleed out 155 # via any bugs above 156 try: 157 os.kill(pid, signal.SIGKILL) 158 except EnvironmentError: 159 pass 160 os.close(read_fd) 161 os.close(write_fd) 162 self.assertNotEqual(child_value, parent_value) 163 164 165 test_classes.append(test__RandomNameSequence) 166 167 168 class test__candidate_tempdir_list(TC): 169 """Test the internal function _candidate_tempdir_list.""" 170 171 def test_nonempty_list(self): 172 # _candidate_tempdir_list returns a nonempty list of strings 173 174 cand = tempfile._candidate_tempdir_list() 175 176 self.assertFalse(len(cand) == 0) 177 for c in cand: 178 self.assertIsInstance(c, basestring) 179 180 def test_wanted_dirs(self): 181 # _candidate_tempdir_list contains the expected directories 182 183 # Make sure the interesting environment variables are all set. 184 with support.EnvironmentVarGuard() as env: 185 for envname in 'TMPDIR', 'TEMP', 'TMP': 186 dirname = os.getenv(envname) 187 if not dirname: 188 env[envname] = os.path.abspath(envname) 189 190 cand = tempfile._candidate_tempdir_list() 191 192 for envname in 'TMPDIR', 'TEMP', 'TMP': 193 dirname = os.getenv(envname) 194 if not dirname: raise ValueError 195 self.assertIn(dirname, cand) 196 197 try: 198 dirname = os.getcwd() 199 except (AttributeError, os.error): 200 dirname = os.curdir 201 202 self.assertIn(dirname, cand) 203 204 # Not practical to try to verify the presence of OS-specific 205 # paths in this list. 206 207 test_classes.append(test__candidate_tempdir_list) 208 209 # We test _get_default_tempdir some more by testing gettempdir. 210 211 class TestGetDefaultTempdir(TC): 212 """Test _get_default_tempdir().""" 213 214 def test_no_files_left_behind(self): 215 # use a private empty directory 216 our_temp_directory = tempfile.mkdtemp() 217 try: 218 # force _get_default_tempdir() to consider our empty directory 219 def our_candidate_list(): 220 return [our_temp_directory] 221 222 with support.swap_attr(tempfile, "_candidate_tempdir_list", 223 our_candidate_list): 224 # verify our directory is empty after _get_default_tempdir() 225 tempfile._get_default_tempdir() 226 self.assertEqual(os.listdir(our_temp_directory), []) 227 228 def raise_OSError(*args, **kwargs): 229 raise OSError(-1) 230 231 with support.swap_attr(io, "open", raise_OSError): 232 # test again with failing io.open() 233 with self.assertRaises(IOError) as cm: 234 tempfile._get_default_tempdir() 235 self.assertEqual(cm.exception.errno, errno.ENOENT) 236 self.assertEqual(os.listdir(our_temp_directory), []) 237 238 open = io.open 239 def bad_writer(*args, **kwargs): 240 fp = open(*args, **kwargs) 241 fp.write = raise_OSError 242 return fp 243 244 with support.swap_attr(io, "open", bad_writer): 245 # test again with failing write() 246 with self.assertRaises(IOError) as cm: 247 tempfile._get_default_tempdir() 248 self.assertEqual(cm.exception.errno, errno.ENOENT) 249 self.assertEqual(os.listdir(our_temp_directory), []) 250 finally: 251 shutil.rmtree(our_temp_directory) 252 253 test_classes.append(TestGetDefaultTempdir) 254 255 256 class test__get_candidate_names(TC): 257 """Test the internal function _get_candidate_names.""" 258 259 def test_retval(self): 260 # _get_candidate_names returns a _RandomNameSequence object 261 obj = tempfile._get_candidate_names() 262 self.assertIsInstance(obj, tempfile._RandomNameSequence) 263 264 def test_same_thing(self): 265 # _get_candidate_names always returns the same object 266 a = tempfile._get_candidate_names() 267 b = tempfile._get_candidate_names() 268 269 self.assertTrue(a is b) 270 271 test_classes.append(test__get_candidate_names) 272 273 274 @contextlib.contextmanager 275 def _inside_empty_temp_dir(): 276 dir = tempfile.mkdtemp() 277 try: 278 with support.swap_attr(tempfile, 'tempdir', dir): 279 yield 280 finally: 281 support.rmtree(dir) 282 283 284 def _mock_candidate_names(*names): 285 return support.swap_attr(tempfile, 286 '_get_candidate_names', 287 lambda: iter(names)) 288 289 290 class TestBadTempdir: 291 292 def test_read_only_directory(self): 293 with _inside_empty_temp_dir(): 294 oldmode = mode = os.stat(tempfile.tempdir).st_mode 295 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 296 os.chmod(tempfile.tempdir, mode) 297 try: 298 if os.access(tempfile.tempdir, os.W_OK): 299 self.skipTest("can't set the directory read-only") 300 with self.assertRaises(OSError) as cm: 301 self.make_temp() 302 self.assertIn(cm.exception.errno, (errno.EPERM, errno.EACCES)) 303 self.assertEqual(os.listdir(tempfile.tempdir), []) 304 finally: 305 os.chmod(tempfile.tempdir, oldmode) 306 307 def test_nonexisting_directory(self): 308 with _inside_empty_temp_dir(): 309 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 310 with support.swap_attr(tempfile, 'tempdir', tempdir): 311 with self.assertRaises(OSError) as cm: 312 self.make_temp() 313 self.assertEqual(cm.exception.errno, errno.ENOENT) 314 315 def test_non_directory(self): 316 with _inside_empty_temp_dir(): 317 tempdir = os.path.join(tempfile.tempdir, 'file') 318 open(tempdir, 'wb').close() 319 with support.swap_attr(tempfile, 'tempdir', tempdir): 320 with self.assertRaises(OSError) as cm: 321 self.make_temp() 322 self.assertIn(cm.exception.errno, (errno.ENOTDIR, errno.ENOENT)) 323 324 325 class test__mkstemp_inner(TestBadTempdir, TC): 326 """Test the internal function _mkstemp_inner.""" 327 328 class mkstemped: 329 _bflags = tempfile._bin_openflags 330 _tflags = tempfile._text_openflags 331 _close = os.close 332 _unlink = os.unlink 333 334 def __init__(self, dir, pre, suf, bin): 335 if bin: flags = self._bflags 336 else: flags = self._tflags 337 338 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags) 339 340 def write(self, str): 341 os.write(self.fd, str) 342 343 def __del__(self): 344 self._close(self.fd) 345 self._unlink(self.name) 346 347 def do_create(self, dir=None, pre="", suf="", bin=1): 348 if dir is None: 349 dir = tempfile.gettempdir() 350 try: 351 file = self.mkstemped(dir, pre, suf, bin) 352 except: 353 self.failOnException("_mkstemp_inner") 354 355 self.nameCheck(file.name, dir, pre, suf) 356 return file 357 358 def test_basic(self): 359 # _mkstemp_inner can create files 360 self.do_create().write("blat") 361 self.do_create(pre="a").write("blat") 362 self.do_create(suf="b").write("blat") 363 self.do_create(pre="a", suf="b").write("blat") 364 self.do_create(pre="aa", suf=".txt").write("blat") 365 366 def test_basic_many(self): 367 # _mkstemp_inner can create many files (stochastic) 368 extant = range(TEST_FILES) 369 for i in extant: 370 extant[i] = self.do_create(pre="aa") 371 372 def test_choose_directory(self): 373 # _mkstemp_inner can create files in a user-selected directory 374 dir = tempfile.mkdtemp() 375 try: 376 self.do_create(dir=dir).write("blat") 377 finally: 378 os.rmdir(dir) 379 380 @unittest.skipUnless(has_stat, 'os.stat not available') 381 def test_file_mode(self): 382 # _mkstemp_inner creates files with the proper mode 383 384 file = self.do_create() 385 mode = stat.S_IMODE(os.stat(file.name).st_mode) 386 expected = 0600 387 if sys.platform in ('win32', 'os2emx'): 388 # There's no distinction among 'user', 'group' and 'world'; 389 # replicate the 'user' bits. 390 user = expected >> 6 391 expected = user * (1 + 8 + 64) 392 self.assertEqual(mode, expected) 393 394 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 395 def test_noinherit(self): 396 # _mkstemp_inner file handles are not inherited by child processes 397 398 if support.verbose: 399 v="v" 400 else: 401 v="q" 402 403 file = self.do_create() 404 fd = "%d" % file.fd 405 406 try: 407 me = __file__ 408 except NameError: 409 me = sys.argv[0] 410 411 # We have to exec something, so that FD_CLOEXEC will take 412 # effect. The core of this test is therefore in 413 # tf_inherit_check.py, which see. 414 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 415 "tf_inherit_check.py") 416 417 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 418 # but an arg with embedded spaces should be decorated with double 419 # quotes on each end 420 if sys.platform in ('win32',): 421 decorated = '"%s"' % sys.executable 422 tester = '"%s"' % tester 423 else: 424 decorated = sys.executable 425 426 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 427 self.assertFalse(retval < 0, 428 "child process caught fatal signal %d" % -retval) 429 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 430 431 @unittest.skipUnless(has_textmode, "text mode not available") 432 def test_textmode(self): 433 # _mkstemp_inner can create files in text mode 434 435 self.do_create(bin=0).write("blat\n") 436 # XXX should test that the file really is a text file 437 438 def make_temp(self): 439 return tempfile._mkstemp_inner(tempfile.gettempdir(), 440 tempfile.template, 441 '', 442 tempfile._bin_openflags) 443 444 def test_collision_with_existing_file(self): 445 # _mkstemp_inner tries another name when a file with 446 # the chosen name already exists 447 with _inside_empty_temp_dir(), \ 448 _mock_candidate_names('aaa', 'aaa', 'bbb'): 449 (fd1, name1) = self.make_temp() 450 os.close(fd1) 451 self.assertTrue(name1.endswith('aaa')) 452 453 (fd2, name2) = self.make_temp() 454 os.close(fd2) 455 self.assertTrue(name2.endswith('bbb')) 456 457 def test_collision_with_existing_directory(self): 458 # _mkstemp_inner tries another name when a directory with 459 # the chosen name already exists 460 with _inside_empty_temp_dir(), \ 461 _mock_candidate_names('aaa', 'aaa', 'bbb'): 462 dir = tempfile.mkdtemp() 463 self.assertTrue(dir.endswith('aaa')) 464 465 (fd, name) = self.make_temp() 466 os.close(fd) 467 self.assertTrue(name.endswith('bbb')) 468 469 test_classes.append(test__mkstemp_inner) 470 471 472 class test_gettempprefix(TC): 473 """Test gettempprefix().""" 474 475 def test_sane_template(self): 476 # gettempprefix returns a nonempty prefix string 477 p = tempfile.gettempprefix() 478 479 self.assertIsInstance(p, basestring) 480 self.assertTrue(len(p) > 0) 481 482 def test_usable_template(self): 483 # gettempprefix returns a usable prefix string 484 485 # Create a temp directory, avoiding use of the prefix. 486 # Then attempt to create a file whose name is 487 # prefix + 'xxxxxx.xxx' in that directory. 488 p = tempfile.gettempprefix() + "xxxxxx.xxx" 489 d = tempfile.mkdtemp(prefix="") 490 try: 491 p = os.path.join(d, p) 492 try: 493 fd = os.open(p, os.O_RDWR | os.O_CREAT) 494 except: 495 self.failOnException("os.open") 496 os.close(fd) 497 os.unlink(p) 498 finally: 499 os.rmdir(d) 500 501 test_classes.append(test_gettempprefix) 502 503 504 class test_gettempdir(TC): 505 """Test gettempdir().""" 506 507 def test_directory_exists(self): 508 # gettempdir returns a directory which exists 509 510 dir = tempfile.gettempdir() 511 self.assertTrue(os.path.isabs(dir) or dir == os.curdir, 512 "%s is not an absolute path" % dir) 513 self.assertTrue(os.path.isdir(dir), 514 "%s is not a directory" % dir) 515 516 def test_directory_writable(self): 517 # gettempdir returns a directory writable by the user 518 519 # sneaky: just instantiate a NamedTemporaryFile, which 520 # defaults to writing into the directory returned by 521 # gettempdir. 522 try: 523 file = tempfile.NamedTemporaryFile() 524 file.write("blat") 525 file.close() 526 except: 527 self.failOnException("create file in %s" % tempfile.gettempdir()) 528 529 def test_same_thing(self): 530 # gettempdir always returns the same object 531 a = tempfile.gettempdir() 532 b = tempfile.gettempdir() 533 534 self.assertTrue(a is b) 535 536 test_classes.append(test_gettempdir) 537 538 539 class test_mkstemp(TC): 540 """Test mkstemp().""" 541 542 def do_create(self, dir=None, pre="", suf=""): 543 if dir is None: 544 dir = tempfile.gettempdir() 545 try: 546 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 547 (ndir, nbase) = os.path.split(name) 548 adir = os.path.abspath(dir) 549 self.assertEqual(adir, ndir, 550 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 551 except: 552 self.failOnException("mkstemp") 553 554 try: 555 self.nameCheck(name, dir, pre, suf) 556 finally: 557 os.close(fd) 558 os.unlink(name) 559 560 def test_basic(self): 561 # mkstemp can create files 562 self.do_create() 563 self.do_create(pre="a") 564 self.do_create(suf="b") 565 self.do_create(pre="a", suf="b") 566 self.do_create(pre="aa", suf=".txt") 567 self.do_create(dir=".") 568 569 def test_choose_directory(self): 570 # mkstemp can create directories in a user-selected directory 571 dir = tempfile.mkdtemp() 572 try: 573 self.do_create(dir=dir) 574 finally: 575 os.rmdir(dir) 576 577 test_classes.append(test_mkstemp) 578 579 580 class test_mkdtemp(TestBadTempdir, TC): 581 """Test mkdtemp().""" 582 583 def make_temp(self): 584 return tempfile.mkdtemp() 585 586 def do_create(self, dir=None, pre="", suf=""): 587 if dir is None: 588 dir = tempfile.gettempdir() 589 try: 590 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 591 except: 592 self.failOnException("mkdtemp") 593 594 try: 595 self.nameCheck(name, dir, pre, suf) 596 return name 597 except: 598 os.rmdir(name) 599 raise 600 601 def test_basic(self): 602 # mkdtemp can create directories 603 os.rmdir(self.do_create()) 604 os.rmdir(self.do_create(pre="a")) 605 os.rmdir(self.do_create(suf="b")) 606 os.rmdir(self.do_create(pre="a", suf="b")) 607 os.rmdir(self.do_create(pre="aa", suf=".txt")) 608 609 def test_basic_many(self): 610 # mkdtemp can create many directories (stochastic) 611 extant = range(TEST_FILES) 612 try: 613 for i in extant: 614 extant[i] = self.do_create(pre="aa") 615 finally: 616 for i in extant: 617 if(isinstance(i, basestring)): 618 os.rmdir(i) 619 620 def test_choose_directory(self): 621 # mkdtemp can create directories in a user-selected directory 622 dir = tempfile.mkdtemp() 623 try: 624 os.rmdir(self.do_create(dir=dir)) 625 finally: 626 os.rmdir(dir) 627 628 @unittest.skipUnless(has_stat, 'os.stat not available') 629 def test_mode(self): 630 # mkdtemp creates directories with the proper mode 631 632 dir = self.do_create() 633 try: 634 mode = stat.S_IMODE(os.stat(dir).st_mode) 635 mode &= 0777 # Mask off sticky bits inherited from /tmp 636 expected = 0700 637 if sys.platform in ('win32', 'os2emx'): 638 # There's no distinction among 'user', 'group' and 'world'; 639 # replicate the 'user' bits. 640 user = expected >> 6 641 expected = user * (1 + 8 + 64) 642 self.assertEqual(mode, expected) 643 finally: 644 os.rmdir(dir) 645 646 def test_collision_with_existing_file(self): 647 # mkdtemp tries another name when a file with 648 # the chosen name already exists 649 with _inside_empty_temp_dir(), \ 650 _mock_candidate_names('aaa', 'aaa', 'bbb'): 651 file = tempfile.NamedTemporaryFile(delete=False) 652 file.close() 653 self.assertTrue(file.name.endswith('aaa')) 654 dir = tempfile.mkdtemp() 655 self.assertTrue(dir.endswith('bbb')) 656 657 def test_collision_with_existing_directory(self): 658 # mkdtemp tries another name when a directory with 659 # the chosen name already exists 660 with _inside_empty_temp_dir(), \ 661 _mock_candidate_names('aaa', 'aaa', 'bbb'): 662 dir1 = tempfile.mkdtemp() 663 self.assertTrue(dir1.endswith('aaa')) 664 dir2 = tempfile.mkdtemp() 665 self.assertTrue(dir2.endswith('bbb')) 666 667 test_classes.append(test_mkdtemp) 668 669 670 class test_mktemp(TC): 671 """Test mktemp().""" 672 673 # For safety, all use of mktemp must occur in a private directory. 674 # We must also suppress the RuntimeWarning it generates. 675 def setUp(self): 676 self.dir = tempfile.mkdtemp() 677 678 def tearDown(self): 679 if self.dir: 680 os.rmdir(self.dir) 681 self.dir = None 682 683 class mktemped: 684 _unlink = os.unlink 685 _bflags = tempfile._bin_openflags 686 687 def __init__(self, dir, pre, suf): 688 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 689 # Create the file. This will raise an exception if it's 690 # mysteriously appeared in the meanwhile. 691 os.close(os.open(self.name, self._bflags, 0600)) 692 693 def __del__(self): 694 self._unlink(self.name) 695 696 def do_create(self, pre="", suf=""): 697 try: 698 file = self.mktemped(self.dir, pre, suf) 699 except: 700 self.failOnException("mktemp") 701 702 self.nameCheck(file.name, self.dir, pre, suf) 703 return file 704 705 def test_basic(self): 706 # mktemp can choose usable file names 707 self.do_create() 708 self.do_create(pre="a") 709 self.do_create(suf="b") 710 self.do_create(pre="a", suf="b") 711 self.do_create(pre="aa", suf=".txt") 712 713 def test_many(self): 714 # mktemp can choose many usable file names (stochastic) 715 extant = range(TEST_FILES) 716 for i in extant: 717 extant[i] = self.do_create(pre="aa") 718 719 ## def test_warning(self): 720 ## # mktemp issues a warning when used 721 ## warnings.filterwarnings("error", 722 ## category=RuntimeWarning, 723 ## message="mktemp") 724 ## self.assertRaises(RuntimeWarning, 725 ## tempfile.mktemp, dir=self.dir) 726 727 test_classes.append(test_mktemp) 728 729 730 # We test _TemporaryFileWrapper by testing NamedTemporaryFile. 731 732 733 class test_NamedTemporaryFile(TC): 734 """Test NamedTemporaryFile().""" 735 736 def do_create(self, dir=None, pre="", suf="", delete=True): 737 if dir is None: 738 dir = tempfile.gettempdir() 739 try: 740 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 741 delete=delete) 742 except: 743 self.failOnException("NamedTemporaryFile") 744 745 self.nameCheck(file.name, dir, pre, suf) 746 return file 747 748 749 def test_basic(self): 750 # NamedTemporaryFile can create files 751 self.do_create() 752 self.do_create(pre="a") 753 self.do_create(suf="b") 754 self.do_create(pre="a", suf="b") 755 self.do_create(pre="aa", suf=".txt") 756 757 def test_creates_named(self): 758 # NamedTemporaryFile creates files with names 759 f = tempfile.NamedTemporaryFile() 760 self.assertTrue(os.path.exists(f.name), 761 "NamedTemporaryFile %s does not exist" % f.name) 762 763 def test_del_on_close(self): 764 # A NamedTemporaryFile is deleted when closed 765 dir = tempfile.mkdtemp() 766 try: 767 f = tempfile.NamedTemporaryFile(dir=dir) 768 f.write('blat') 769 f.close() 770 self.assertFalse(os.path.exists(f.name), 771 "NamedTemporaryFile %s exists after close" % f.name) 772 finally: 773 os.rmdir(dir) 774 775 def test_dis_del_on_close(self): 776 # Tests that delete-on-close can be disabled 777 dir = tempfile.mkdtemp() 778 tmp = None 779 try: 780 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 781 tmp = f.name 782 f.write('blat') 783 f.close() 784 self.assertTrue(os.path.exists(f.name), 785 "NamedTemporaryFile %s missing after close" % f.name) 786 finally: 787 if tmp is not None: 788 os.unlink(tmp) 789 os.rmdir(dir) 790 791 def test_multiple_close(self): 792 # A NamedTemporaryFile can be closed many times without error 793 f = tempfile.NamedTemporaryFile() 794 f.write('abc\n') 795 f.close() 796 try: 797 f.close() 798 f.close() 799 except: 800 self.failOnException("close") 801 802 def test_context_manager(self): 803 # A NamedTemporaryFile can be used as a context manager 804 with tempfile.NamedTemporaryFile() as f: 805 self.assertTrue(os.path.exists(f.name)) 806 self.assertFalse(os.path.exists(f.name)) 807 def use_closed(): 808 with f: 809 pass 810 self.assertRaises(ValueError, use_closed) 811 812 def test_no_leak_fd(self): 813 # Issue #21058: don't leak file descriptor when fdopen() fails 814 old_close = os.close 815 old_fdopen = os.fdopen 816 closed = [] 817 def close(fd): 818 closed.append(fd) 819 def fdopen(*args): 820 raise ValueError() 821 os.close = close 822 os.fdopen = fdopen 823 try: 824 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 825 self.assertEqual(len(closed), 1) 826 finally: 827 os.close = old_close 828 os.fdopen = old_fdopen 829 830 def test_bad_mode(self): 831 dir = tempfile.mkdtemp() 832 self.addCleanup(support.rmtree, dir) 833 with self.assertRaises(TypeError): 834 tempfile.NamedTemporaryFile(mode=(), dir=dir) 835 self.assertEqual(os.listdir(dir), []) 836 837 # How to test the mode and bufsize parameters? 838 839 test_classes.append(test_NamedTemporaryFile) 840 841 class test_SpooledTemporaryFile(TC): 842 """Test SpooledTemporaryFile().""" 843 844 def do_create(self, max_size=0, dir=None, pre="", suf=""): 845 if dir is None: 846 dir = tempfile.gettempdir() 847 try: 848 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 849 except: 850 self.failOnException("SpooledTemporaryFile") 851 852 return file 853 854 855 def test_basic(self): 856 # SpooledTemporaryFile can create files 857 f = self.do_create() 858 self.assertFalse(f._rolled) 859 f = self.do_create(max_size=100, pre="a", suf=".txt") 860 self.assertFalse(f._rolled) 861 862 def test_del_on_close(self): 863 # A SpooledTemporaryFile is deleted when closed 864 dir = tempfile.mkdtemp() 865 try: 866 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 867 self.assertFalse(f._rolled) 868 f.write('blat ' * 5) 869 self.assertTrue(f._rolled) 870 filename = f.name 871 f.close() 872 self.assertFalse(os.path.exists(filename), 873 "SpooledTemporaryFile %s exists after close" % filename) 874 finally: 875 os.rmdir(dir) 876 877 def test_rewrite_small(self): 878 # A SpooledTemporaryFile can be written to multiple within the max_size 879 f = self.do_create(max_size=30) 880 self.assertFalse(f._rolled) 881 for i in range(5): 882 f.seek(0, 0) 883 f.write('x' * 20) 884 self.assertFalse(f._rolled) 885 886 def test_write_sequential(self): 887 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 888 # over afterward 889 f = self.do_create(max_size=30) 890 self.assertFalse(f._rolled) 891 f.write('x' * 20) 892 self.assertFalse(f._rolled) 893 f.write('x' * 10) 894 self.assertFalse(f._rolled) 895 f.write('x') 896 self.assertTrue(f._rolled) 897 898 def test_writelines(self): 899 # Verify writelines with a SpooledTemporaryFile 900 f = self.do_create() 901 f.writelines((b'x', b'y', b'z')) 902 f.seek(0) 903 buf = f.read() 904 self.assertEqual(buf, b'xyz') 905 906 def test_writelines_sequential(self): 907 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 908 # over afterward 909 f = self.do_create(max_size=35) 910 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 911 self.assertFalse(f._rolled) 912 f.write(b'x') 913 self.assertTrue(f._rolled) 914 915 def test_xreadlines(self): 916 f = self.do_create(max_size=20) 917 f.write(b'abc\n' * 5) 918 f.seek(0) 919 self.assertFalse(f._rolled) 920 self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5) 921 f.write(b'x\ny') 922 self.assertTrue(f._rolled) 923 f.seek(0) 924 self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y']) 925 926 def test_sparse(self): 927 # A SpooledTemporaryFile that is written late in the file will extend 928 # when that occurs 929 f = self.do_create(max_size=30) 930 self.assertFalse(f._rolled) 931 f.seek(100, 0) 932 self.assertFalse(f._rolled) 933 f.write('x') 934 self.assertTrue(f._rolled) 935 936 def test_fileno(self): 937 # A SpooledTemporaryFile should roll over to a real file on fileno() 938 f = self.do_create(max_size=30) 939 self.assertFalse(f._rolled) 940 self.assertTrue(f.fileno() > 0) 941 self.assertTrue(f._rolled) 942 943 def test_multiple_close_before_rollover(self): 944 # A SpooledTemporaryFile can be closed many times without error 945 f = tempfile.SpooledTemporaryFile() 946 f.write('abc\n') 947 self.assertFalse(f._rolled) 948 f.close() 949 try: 950 f.close() 951 f.close() 952 except: 953 self.failOnException("close") 954 955 def test_multiple_close_after_rollover(self): 956 # A SpooledTemporaryFile can be closed many times without error 957 f = tempfile.SpooledTemporaryFile(max_size=1) 958 f.write('abc\n') 959 self.assertTrue(f._rolled) 960 f.close() 961 try: 962 f.close() 963 f.close() 964 except: 965 self.failOnException("close") 966 967 def test_bound_methods(self): 968 # It should be OK to steal a bound method from a SpooledTemporaryFile 969 # and use it independently; when the file rolls over, those bound 970 # methods should continue to function 971 f = self.do_create(max_size=30) 972 read = f.read 973 write = f.write 974 seek = f.seek 975 976 write("a" * 35) 977 write("b" * 35) 978 seek(0, 0) 979 self.assertTrue(read(70) == 'a'*35 + 'b'*35) 980 981 def test_properties(self): 982 f = tempfile.SpooledTemporaryFile(max_size=10) 983 f.write(b'x' * 10) 984 self.assertFalse(f._rolled) 985 self.assertEqual(f.mode, 'w+b') 986 self.assertIsNone(f.name) 987 with self.assertRaises(AttributeError): 988 f.newlines 989 with self.assertRaises(AttributeError): 990 f.encoding 991 992 f.write(b'x') 993 self.assertTrue(f._rolled) 994 self.assertEqual(f.mode, 'w+b') 995 self.assertIsNotNone(f.name) 996 with self.assertRaises(AttributeError): 997 f.newlines 998 with self.assertRaises(AttributeError): 999 f.encoding 1000 1001 def test_context_manager_before_rollover(self): 1002 # A SpooledTemporaryFile can be used as a context manager 1003 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1004 self.assertFalse(f._rolled) 1005 self.assertFalse(f.closed) 1006 self.assertTrue(f.closed) 1007 def use_closed(): 1008 with f: 1009 pass 1010 self.assertRaises(ValueError, use_closed) 1011 1012 def test_context_manager_during_rollover(self): 1013 # A SpooledTemporaryFile can be used as a context manager 1014 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1015 self.assertFalse(f._rolled) 1016 f.write('abc\n') 1017 f.flush() 1018 self.assertTrue(f._rolled) 1019 self.assertFalse(f.closed) 1020 self.assertTrue(f.closed) 1021 def use_closed(): 1022 with f: 1023 pass 1024 self.assertRaises(ValueError, use_closed) 1025 1026 def test_context_manager_after_rollover(self): 1027 # A SpooledTemporaryFile can be used as a context manager 1028 f = tempfile.SpooledTemporaryFile(max_size=1) 1029 f.write('abc\n') 1030 f.flush() 1031 self.assertTrue(f._rolled) 1032 with f: 1033 self.assertFalse(f.closed) 1034 self.assertTrue(f.closed) 1035 def use_closed(): 1036 with f: 1037 pass 1038 self.assertRaises(ValueError, use_closed) 1039 1040 1041 test_classes.append(test_SpooledTemporaryFile) 1042 1043 1044 class test_TemporaryFile(TC): 1045 """Test TemporaryFile().""" 1046 1047 def test_basic(self): 1048 # TemporaryFile can create files 1049 # No point in testing the name params - the file has no name. 1050 try: 1051 tempfile.TemporaryFile() 1052 except: 1053 self.failOnException("TemporaryFile") 1054 1055 def test_has_no_name(self): 1056 # TemporaryFile creates files with no names (on this system) 1057 dir = tempfile.mkdtemp() 1058 f = tempfile.TemporaryFile(dir=dir) 1059 f.write('blat') 1060 1061 # Sneaky: because this file has no name, it should not prevent 1062 # us from removing the directory it was created in. 1063 try: 1064 os.rmdir(dir) 1065 except: 1066 ei = sys.exc_info() 1067 # cleanup 1068 f.close() 1069 os.rmdir(dir) 1070 self.failOnException("rmdir", ei) 1071 1072 def test_multiple_close(self): 1073 # A TemporaryFile can be closed many times without error 1074 f = tempfile.TemporaryFile() 1075 f.write('abc\n') 1076 f.close() 1077 try: 1078 f.close() 1079 f.close() 1080 except: 1081 self.failOnException("close") 1082 1083 # How to test the mode and bufsize parameters? 1084 1085 1086 if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1087 test_classes.append(test_TemporaryFile) 1088 1089 def test_main(): 1090 support.run_unittest(*test_classes) 1091 1092 if __name__ == "__main__": 1093 test_main() 1094