Home | History | Annotate | Download | only in test
      1 # tempfile.py unit tests.
      2 import tempfile
      3 import errno
      4 import io
      5 import os
      6 import signal
      7 import sys
      8 import re
      9 import warnings
     10 import contextlib
     11 import weakref
     12 from unittest import mock
     13 
     14 import unittest
     15 from test import support
     16 from test.support import script_helper
     17 
     18 
     19 if hasattr(os, 'stat'):
     20     import stat
     21     has_stat = 1
     22 else:
     23     has_stat = 0
     24 
     25 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
     26 has_spawnl = hasattr(os, 'spawnl')
     27 
     28 # TEST_FILES may need to be tweaked for systems depending on the maximum
     29 # number of files that can be opened at one time (see ulimit -n)
     30 if sys.platform.startswith('openbsd'):
     31     TEST_FILES = 48
     32 else:
     33     TEST_FILES = 100
     34 
     35 # This is organized as one test for each chunk of code in tempfile.py,
     36 # in order of their appearance in the file.  Testing which requires
     37 # threads is not done here.
     38 
     39 class TestLowLevelInternals(unittest.TestCase):
     40     def test_infer_return_type_singles(self):
     41         self.assertIs(str, tempfile._infer_return_type(''))
     42         self.assertIs(bytes, tempfile._infer_return_type(b''))
     43         self.assertIs(str, tempfile._infer_return_type(None))
     44 
     45     def test_infer_return_type_multiples(self):
     46         self.assertIs(str, tempfile._infer_return_type('', ''))
     47         self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
     48         with self.assertRaises(TypeError):
     49             tempfile._infer_return_type('', b'')
     50         with self.assertRaises(TypeError):
     51             tempfile._infer_return_type(b'', '')
     52 
     53     def test_infer_return_type_multiples_and_none(self):
     54         self.assertIs(str, tempfile._infer_return_type(None, ''))
     55         self.assertIs(str, tempfile._infer_return_type('', None))
     56         self.assertIs(str, tempfile._infer_return_type(None, None))
     57         self.assertIs(bytes, tempfile._infer_return_type(b'', None))
     58         self.assertIs(bytes, tempfile._infer_return_type(None, b''))
     59         with self.assertRaises(TypeError):
     60             tempfile._infer_return_type('', None, b'')
     61         with self.assertRaises(TypeError):
     62             tempfile._infer_return_type(b'', None, '')
     63 
     64 
     65 # Common functionality.
     66 
     67 class BaseTestCase(unittest.TestCase):
     68 
     69     str_check = re.compile(r"^[a-z0-9_-]{8}$")
     70     b_check = re.compile(br"^[a-z0-9_-]{8}$")
     71 
     72     def setUp(self):
     73         self._warnings_manager = support.check_warnings()
     74         self._warnings_manager.__enter__()
     75         warnings.filterwarnings("ignore", category=RuntimeWarning,
     76                                 message="mktemp", module=__name__)
     77 
     78     def tearDown(self):
     79         self._warnings_manager.__exit__(None, None, None)
     80 
     81 
     82     def nameCheck(self, name, dir, pre, suf):
     83         (ndir, nbase) = os.path.split(name)
     84         npre  = nbase[:len(pre)]
     85         nsuf  = nbase[len(nbase)-len(suf):]
     86 
     87         if dir is not None:
     88             self.assertIs(type(name), str if type(dir) is str else bytes,
     89                           "unexpected return type")
     90         if pre is not None:
     91             self.assertIs(type(name), str if type(pre) is str else bytes,
     92                           "unexpected return type")
     93         if suf is not None:
     94             self.assertIs(type(name), str if type(suf) is str else bytes,
     95                           "unexpected return type")
     96         if (dir, pre, suf) == (None, None, None):
     97             self.assertIs(type(name), str, "default return type must be str")
     98 
     99         # check for equality of the absolute paths!
    100         self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
    101                          "file %r not in directory %r" % (name, dir))
    102         self.assertEqual(npre, pre,
    103                          "file %r does not begin with %r" % (nbase, pre))
    104         self.assertEqual(nsuf, suf,
    105                          "file %r does not end with %r" % (nbase, suf))
    106 
    107         nbase = nbase[len(pre):len(nbase)-len(suf)]
    108         check = self.str_check if isinstance(nbase, str) else self.b_check
    109         self.assertTrue(check.match(nbase),
    110                         "random characters %r do not match %r"
    111                         % (nbase, check.pattern))
    112 
    113 
    114 class TestExports(BaseTestCase):
    115     def test_exports(self):
    116         # There are no surprising symbols in the tempfile module
    117         dict = tempfile.__dict__
    118 
    119         expected = {
    120             "NamedTemporaryFile" : 1,
    121             "TemporaryFile" : 1,
    122             "mkstemp" : 1,
    123             "mkdtemp" : 1,
    124             "mktemp" : 1,
    125             "TMP_MAX" : 1,
    126             "gettempprefix" : 1,
    127             "gettempprefixb" : 1,
    128             "gettempdir" : 1,
    129             "gettempdirb" : 1,
    130             "tempdir" : 1,
    131             "template" : 1,
    132             "SpooledTemporaryFile" : 1,
    133             "TemporaryDirectory" : 1,
    134         }
    135 
    136         unexp = []
    137         for key in dict:
    138             if key[0] != '_' and key not in expected:
    139                 unexp.append(key)
    140         self.assertTrue(len(unexp) == 0,
    141                         "unexpected keys: %s" % unexp)
    142 
    143 
    144 class TestRandomNameSequence(BaseTestCase):
    145     """Test the internal iterator object _RandomNameSequence."""
    146 
    147     def setUp(self):
    148         self.r = tempfile._RandomNameSequence()
    149         super().setUp()
    150 
    151     def test_get_six_char_str(self):
    152         # _RandomNameSequence returns a six-character string
    153         s = next(self.r)
    154         self.nameCheck(s, '', '', '')
    155 
    156     def test_many(self):
    157         # _RandomNameSequence returns no duplicate strings (stochastic)
    158 
    159         dict = {}
    160         r = self.r
    161         for i in range(TEST_FILES):
    162             s = next(r)
    163             self.nameCheck(s, '', '', '')
    164             self.assertNotIn(s, dict)
    165             dict[s] = 1
    166 
    167     def supports_iter(self):
    168         # _RandomNameSequence supports the iterator protocol
    169 
    170         i = 0
    171         r = self.r
    172         for s in r:
    173             i += 1
    174             if i == 20:
    175                 break
    176 
    177     @unittest.skipUnless(hasattr(os, 'fork'),
    178         "os.fork is required for this test")
    179     def test_process_awareness(self):
    180         # ensure that the random source differs between
    181         # child and parent.
    182         read_fd, write_fd = os.pipe()
    183         pid = None
    184         try:
    185             pid = os.fork()
    186             if not pid:
    187                 os.close(read_fd)
    188                 os.write(write_fd, next(self.r).encode("ascii"))
    189                 os.close(write_fd)
    190                 # bypass the normal exit handlers- leave those to
    191                 # the parent.
    192                 os._exit(0)
    193             parent_value = next(self.r)
    194             child_value = os.read(read_fd, len(parent_value)).decode("ascii")
    195         finally:
    196             if pid:
    197                 # best effort to ensure the process can't bleed out
    198                 # via any bugs above
    199                 try:
    200                     os.kill(pid, signal.SIGKILL)
    201                 except OSError:
    202                     pass
    203             os.close(read_fd)
    204             os.close(write_fd)
    205         self.assertNotEqual(child_value, parent_value)
    206 
    207 
    208 
    209 class TestCandidateTempdirList(BaseTestCase):
    210     """Test the internal function _candidate_tempdir_list."""
    211 
    212     def test_nonempty_list(self):
    213         # _candidate_tempdir_list returns a nonempty list of strings
    214 
    215         cand = tempfile._candidate_tempdir_list()
    216 
    217         self.assertFalse(len(cand) == 0)
    218         for c in cand:
    219             self.assertIsInstance(c, str)
    220 
    221     def test_wanted_dirs(self):
    222         # _candidate_tempdir_list contains the expected directories
    223 
    224         # Make sure the interesting environment variables are all set.
    225         with support.EnvironmentVarGuard() as env:
    226             for envname in 'TMPDIR', 'TEMP', 'TMP':
    227                 dirname = os.getenv(envname)
    228                 if not dirname:
    229                     env[envname] = os.path.abspath(envname)
    230 
    231             cand = tempfile._candidate_tempdir_list()
    232 
    233             for envname in 'TMPDIR', 'TEMP', 'TMP':
    234                 dirname = os.getenv(envname)
    235                 if not dirname: raise ValueError
    236                 self.assertIn(dirname, cand)
    237 
    238             try:
    239                 dirname = os.getcwd()
    240             except (AttributeError, OSError):
    241                 dirname = os.curdir
    242 
    243             self.assertIn(dirname, cand)
    244 
    245             # Not practical to try to verify the presence of OS-specific
    246             # paths in this list.
    247 
    248 
    249 # We test _get_default_tempdir some more by testing gettempdir.
    250 
    251 class TestGetDefaultTempdir(BaseTestCase):
    252     """Test _get_default_tempdir()."""
    253 
    254     def test_no_files_left_behind(self):
    255         # use a private empty directory
    256         with tempfile.TemporaryDirectory() as our_temp_directory:
    257             # force _get_default_tempdir() to consider our empty directory
    258             def our_candidate_list():
    259                 return [our_temp_directory]
    260 
    261             with support.swap_attr(tempfile, "_candidate_tempdir_list",
    262                                    our_candidate_list):
    263                 # verify our directory is empty after _get_default_tempdir()
    264                 tempfile._get_default_tempdir()
    265                 self.assertEqual(os.listdir(our_temp_directory), [])
    266 
    267                 def raise_OSError(*args, **kwargs):
    268                     raise OSError()
    269 
    270                 with support.swap_attr(io, "open", raise_OSError):
    271                     # test again with failing io.open()
    272                     with self.assertRaises(FileNotFoundError):
    273                         tempfile._get_default_tempdir()
    274                     self.assertEqual(os.listdir(our_temp_directory), [])
    275 
    276                 open = io.open
    277                 def bad_writer(*args, **kwargs):
    278                     fp = open(*args, **kwargs)
    279                     fp.write = raise_OSError
    280                     return fp
    281 
    282                 with support.swap_attr(io, "open", bad_writer):
    283                     # test again with failing write()
    284                     with self.assertRaises(FileNotFoundError):
    285                         tempfile._get_default_tempdir()
    286                     self.assertEqual(os.listdir(our_temp_directory), [])
    287 
    288 
    289 class TestGetCandidateNames(BaseTestCase):
    290     """Test the internal function _get_candidate_names."""
    291 
    292     def test_retval(self):
    293         # _get_candidate_names returns a _RandomNameSequence object
    294         obj = tempfile._get_candidate_names()
    295         self.assertIsInstance(obj, tempfile._RandomNameSequence)
    296 
    297     def test_same_thing(self):
    298         # _get_candidate_names always returns the same object
    299         a = tempfile._get_candidate_names()
    300         b = tempfile._get_candidate_names()
    301 
    302         self.assertTrue(a is b)
    303 
    304 
    305 @contextlib.contextmanager
    306 def _inside_empty_temp_dir():
    307     dir = tempfile.mkdtemp()
    308     try:
    309         with support.swap_attr(tempfile, 'tempdir', dir):
    310             yield
    311     finally:
    312         support.rmtree(dir)
    313 
    314 
    315 def _mock_candidate_names(*names):
    316     return support.swap_attr(tempfile,
    317                              '_get_candidate_names',
    318                              lambda: iter(names))
    319 
    320 
    321 class TestBadTempdir:
    322 
    323     def test_read_only_directory(self):
    324         with _inside_empty_temp_dir():
    325             oldmode = mode = os.stat(tempfile.tempdir).st_mode
    326             mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
    327             os.chmod(tempfile.tempdir, mode)
    328             try:
    329                 if os.access(tempfile.tempdir, os.W_OK):
    330                     self.skipTest("can't set the directory read-only")
    331                 with self.assertRaises(PermissionError):
    332                     self.make_temp()
    333                 self.assertEqual(os.listdir(tempfile.tempdir), [])
    334             finally:
    335                 os.chmod(tempfile.tempdir, oldmode)
    336 
    337     def test_nonexisting_directory(self):
    338         with _inside_empty_temp_dir():
    339             tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
    340             with support.swap_attr(tempfile, 'tempdir', tempdir):
    341                 with self.assertRaises(FileNotFoundError):
    342                     self.make_temp()
    343 
    344     def test_non_directory(self):
    345         with _inside_empty_temp_dir():
    346             tempdir = os.path.join(tempfile.tempdir, 'file')
    347             open(tempdir, 'wb').close()
    348             with support.swap_attr(tempfile, 'tempdir', tempdir):
    349                 with self.assertRaises((NotADirectoryError, FileNotFoundError)):
    350                     self.make_temp()
    351 
    352 
    353 class TestMkstempInner(TestBadTempdir, BaseTestCase):
    354     """Test the internal function _mkstemp_inner."""
    355 
    356     class mkstemped:
    357         _bflags = tempfile._bin_openflags
    358         _tflags = tempfile._text_openflags
    359         _close = os.close
    360         _unlink = os.unlink
    361 
    362         def __init__(self, dir, pre, suf, bin):
    363             if bin: flags = self._bflags
    364             else:   flags = self._tflags
    365 
    366             output_type = tempfile._infer_return_type(dir, pre, suf)
    367             (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
    368 
    369         def write(self, str):
    370             os.write(self.fd, str)
    371 
    372         def __del__(self):
    373             self._close(self.fd)
    374             self._unlink(self.name)
    375 
    376     def do_create(self, dir=None, pre=None, suf=None, bin=1):
    377         output_type = tempfile._infer_return_type(dir, pre, suf)
    378         if dir is None:
    379             if output_type is str:
    380                 dir = tempfile.gettempdir()
    381             else:
    382                 dir = tempfile.gettempdirb()
    383         if pre is None:
    384             pre = output_type()
    385         if suf is None:
    386             suf = output_type()
    387         file = self.mkstemped(dir, pre, suf, bin)
    388 
    389         self.nameCheck(file.name, dir, pre, suf)
    390         return file
    391 
    392     def test_basic(self):
    393         # _mkstemp_inner can create files
    394         self.do_create().write(b"blat")
    395         self.do_create(pre="a").write(b"blat")
    396         self.do_create(suf="b").write(b"blat")
    397         self.do_create(pre="a", suf="b").write(b"blat")
    398         self.do_create(pre="aa", suf=".txt").write(b"blat")
    399 
    400     def test_basic_with_bytes_names(self):
    401         # _mkstemp_inner can create files when given name parts all
    402         # specified as bytes.
    403         dir_b = tempfile.gettempdirb()
    404         self.do_create(dir=dir_b, suf=b"").write(b"blat")
    405         self.do_create(dir=dir_b, pre=b"a").write(b"blat")
    406         self.do_create(dir=dir_b, suf=b"b").write(b"blat")
    407         self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
    408         self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
    409         # Can't mix str & binary types in the args.
    410         with self.assertRaises(TypeError):
    411             self.do_create(dir="", suf=b"").write(b"blat")
    412         with self.assertRaises(TypeError):
    413             self.do_create(dir=dir_b, pre="").write(b"blat")
    414         with self.assertRaises(TypeError):
    415             self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
    416 
    417     def test_basic_many(self):
    418         # _mkstemp_inner can create many files (stochastic)
    419         extant = list(range(TEST_FILES))
    420         for i in extant:
    421             extant[i] = self.do_create(pre="aa")
    422 
    423     def test_choose_directory(self):
    424         # _mkstemp_inner can create files in a user-selected directory
    425         dir = tempfile.mkdtemp()
    426         try:
    427             self.do_create(dir=dir).write(b"blat")
    428         finally:
    429             os.rmdir(dir)
    430 
    431     @unittest.skipUnless(has_stat, 'os.stat not available')
    432     def test_file_mode(self):
    433         # _mkstemp_inner creates files with the proper mode
    434 
    435         file = self.do_create()
    436         mode = stat.S_IMODE(os.stat(file.name).st_mode)
    437         expected = 0o600
    438         if sys.platform == 'win32':
    439             # There's no distinction among 'user', 'group' and 'world';
    440             # replicate the 'user' bits.
    441             user = expected >> 6
    442             expected = user * (1 + 8 + 64)
    443         self.assertEqual(mode, expected)
    444 
    445     @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
    446     def test_noinherit(self):
    447         # _mkstemp_inner file handles are not inherited by child processes
    448 
    449         if support.verbose:
    450             v="v"
    451         else:
    452             v="q"
    453 
    454         file = self.do_create()
    455         self.assertEqual(os.get_inheritable(file.fd), False)
    456         fd = "%d" % file.fd
    457 
    458         try:
    459             me = __file__
    460         except NameError:
    461             me = sys.argv[0]
    462 
    463         # We have to exec something, so that FD_CLOEXEC will take
    464         # effect.  The core of this test is therefore in
    465         # tf_inherit_check.py, which see.
    466         tester = os.path.join(os.path.dirname(os.path.abspath(me)),
    467                               "tf_inherit_check.py")
    468 
    469         # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
    470         # but an arg with embedded spaces should be decorated with double
    471         # quotes on each end
    472         if sys.platform == 'win32':
    473             decorated = '"%s"' % sys.executable
    474             tester = '"%s"' % tester
    475         else:
    476             decorated = sys.executable
    477 
    478         retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
    479         self.assertFalse(retval < 0,
    480                     "child process caught fatal signal %d" % -retval)
    481         self.assertFalse(retval > 0, "child process reports failure %d"%retval)
    482 
    483     @unittest.skipUnless(has_textmode, "text mode not available")
    484     def test_textmode(self):
    485         # _mkstemp_inner can create files in text mode
    486 
    487         # A text file is truncated at the first Ctrl+Z byte
    488         f = self.do_create(bin=0)
    489         f.write(b"blat\x1a")
    490         f.write(b"extra\n")
    491         os.lseek(f.fd, 0, os.SEEK_SET)
    492         self.assertEqual(os.read(f.fd, 20), b"blat")
    493 
    494     def make_temp(self):
    495         return tempfile._mkstemp_inner(tempfile.gettempdir(),
    496                                        tempfile.gettempprefix(),
    497                                        '',
    498                                        tempfile._bin_openflags,
    499                                        str)
    500 
    501     def test_collision_with_existing_file(self):
    502         # _mkstemp_inner tries another name when a file with
    503         # the chosen name already exists
    504         with _inside_empty_temp_dir(), \
    505              _mock_candidate_names('aaa', 'aaa', 'bbb'):
    506             (fd1, name1) = self.make_temp()
    507             os.close(fd1)
    508             self.assertTrue(name1.endswith('aaa'))
    509 
    510             (fd2, name2) = self.make_temp()
    511             os.close(fd2)
    512             self.assertTrue(name2.endswith('bbb'))
    513 
    514     def test_collision_with_existing_directory(self):
    515         # _mkstemp_inner tries another name when a directory with
    516         # the chosen name already exists
    517         with _inside_empty_temp_dir(), \
    518              _mock_candidate_names('aaa', 'aaa', 'bbb'):
    519             dir = tempfile.mkdtemp()
    520             self.assertTrue(dir.endswith('aaa'))
    521 
    522             (fd, name) = self.make_temp()
    523             os.close(fd)
    524             self.assertTrue(name.endswith('bbb'))
    525 
    526 
    527 class TestGetTempPrefix(BaseTestCase):
    528     """Test gettempprefix()."""
    529 
    530     def test_sane_template(self):
    531         # gettempprefix returns a nonempty prefix string
    532         p = tempfile.gettempprefix()
    533 
    534         self.assertIsInstance(p, str)
    535         self.assertGreater(len(p), 0)
    536 
    537         pb = tempfile.gettempprefixb()
    538 
    539         self.assertIsInstance(pb, bytes)
    540         self.assertGreater(len(pb), 0)
    541 
    542     def test_usable_template(self):
    543         # gettempprefix returns a usable prefix string
    544 
    545         # Create a temp directory, avoiding use of the prefix.
    546         # Then attempt to create a file whose name is
    547         # prefix + 'xxxxxx.xxx' in that directory.
    548         p = tempfile.gettempprefix() + "xxxxxx.xxx"
    549         d = tempfile.mkdtemp(prefix="")
    550         try:
    551             p = os.path.join(d, p)
    552             fd = os.open(p, os.O_RDWR | os.O_CREAT)
    553             os.close(fd)
    554             os.unlink(p)
    555         finally:
    556             os.rmdir(d)
    557 
    558 
    559 class TestGetTempDir(BaseTestCase):
    560     """Test gettempdir()."""
    561 
    562     def test_directory_exists(self):
    563         # gettempdir returns a directory which exists
    564 
    565         for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
    566             self.assertTrue(os.path.isabs(d) or d == os.curdir,
    567                             "%r is not an absolute path" % d)
    568             self.assertTrue(os.path.isdir(d),
    569                             "%r is not a directory" % d)
    570 
    571     def test_directory_writable(self):
    572         # gettempdir returns a directory writable by the user
    573 
    574         # sneaky: just instantiate a NamedTemporaryFile, which
    575         # defaults to writing into the directory returned by
    576         # gettempdir.
    577         file = tempfile.NamedTemporaryFile()
    578         file.write(b"blat")
    579         file.close()
    580 
    581     def test_same_thing(self):
    582         # gettempdir always returns the same object
    583         a = tempfile.gettempdir()
    584         b = tempfile.gettempdir()
    585         c = tempfile.gettempdirb()
    586 
    587         self.assertTrue(a is b)
    588         self.assertNotEqual(type(a), type(c))
    589         self.assertEqual(a, os.fsdecode(c))
    590 
    591     def test_case_sensitive(self):
    592         # gettempdir should not flatten its case
    593         # even on a case-insensitive file system
    594         case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
    595         _tempdir, tempfile.tempdir = tempfile.tempdir, None
    596         try:
    597             with support.EnvironmentVarGuard() as env:
    598                 # Fake the first env var which is checked as a candidate
    599                 env["TMPDIR"] = case_sensitive_tempdir
    600                 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
    601         finally:
    602             tempfile.tempdir = _tempdir
    603             support.rmdir(case_sensitive_tempdir)
    604 
    605 
    606 class TestMkstemp(BaseTestCase):
    607     """Test mkstemp()."""
    608 
    609     def do_create(self, dir=None, pre=None, suf=None):
    610         output_type = tempfile._infer_return_type(dir, pre, suf)
    611         if dir is None:
    612             if output_type is str:
    613                 dir = tempfile.gettempdir()
    614             else:
    615                 dir = tempfile.gettempdirb()
    616         if pre is None:
    617             pre = output_type()
    618         if suf is None:
    619             suf = output_type()
    620         (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
    621         (ndir, nbase) = os.path.split(name)
    622         adir = os.path.abspath(dir)
    623         self.assertEqual(adir, ndir,
    624             "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
    625 
    626         try:
    627             self.nameCheck(name, dir, pre, suf)
    628         finally:
    629             os.close(fd)
    630             os.unlink(name)
    631 
    632     def test_basic(self):
    633         # mkstemp can create files
    634         self.do_create()
    635         self.do_create(pre="a")
    636         self.do_create(suf="b")
    637         self.do_create(pre="a", suf="b")
    638         self.do_create(pre="aa", suf=".txt")
    639         self.do_create(dir=".")
    640 
    641     def test_basic_with_bytes_names(self):
    642         # mkstemp can create files when given name parts all
    643         # specified as bytes.
    644         d = tempfile.gettempdirb()
    645         self.do_create(dir=d, suf=b"")
    646         self.do_create(dir=d, pre=b"a")
    647         self.do_create(dir=d, suf=b"b")
    648         self.do_create(dir=d, pre=b"a", suf=b"b")
    649         self.do_create(dir=d, pre=b"aa", suf=b".txt")
    650         self.do_create(dir=b".")
    651         with self.assertRaises(TypeError):
    652             self.do_create(dir=".", pre=b"aa", suf=b".txt")
    653         with self.assertRaises(TypeError):
    654             self.do_create(dir=b".", pre="aa", suf=b".txt")
    655         with self.assertRaises(TypeError):
    656             self.do_create(dir=b".", pre=b"aa", suf=".txt")
    657 
    658 
    659     def test_choose_directory(self):
    660         # mkstemp can create directories in a user-selected directory
    661         dir = tempfile.mkdtemp()
    662         try:
    663             self.do_create(dir=dir)
    664         finally:
    665             os.rmdir(dir)
    666 
    667 
    668 class TestMkdtemp(TestBadTempdir, BaseTestCase):
    669     """Test mkdtemp()."""
    670 
    671     def make_temp(self):
    672         return tempfile.mkdtemp()
    673 
    674     def do_create(self, dir=None, pre=None, suf=None):
    675         output_type = tempfile._infer_return_type(dir, pre, suf)
    676         if dir is None:
    677             if output_type is str:
    678                 dir = tempfile.gettempdir()
    679             else:
    680                 dir = tempfile.gettempdirb()
    681         if pre is None:
    682             pre = output_type()
    683         if suf is None:
    684             suf = output_type()
    685         name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
    686 
    687         try:
    688             self.nameCheck(name, dir, pre, suf)
    689             return name
    690         except:
    691             os.rmdir(name)
    692             raise
    693 
    694     def test_basic(self):
    695         # mkdtemp can create directories
    696         os.rmdir(self.do_create())
    697         os.rmdir(self.do_create(pre="a"))
    698         os.rmdir(self.do_create(suf="b"))
    699         os.rmdir(self.do_create(pre="a", suf="b"))
    700         os.rmdir(self.do_create(pre="aa", suf=".txt"))
    701 
    702     def test_basic_with_bytes_names(self):
    703         # mkdtemp can create directories when given all binary parts
    704         d = tempfile.gettempdirb()
    705         os.rmdir(self.do_create(dir=d))
    706         os.rmdir(self.do_create(dir=d, pre=b"a"))
    707         os.rmdir(self.do_create(dir=d, suf=b"b"))
    708         os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
    709         os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
    710         with self.assertRaises(TypeError):
    711             os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
    712         with self.assertRaises(TypeError):
    713             os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
    714         with self.assertRaises(TypeError):
    715             os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
    716 
    717     def test_basic_many(self):
    718         # mkdtemp can create many directories (stochastic)
    719         extant = list(range(TEST_FILES))
    720         try:
    721             for i in extant:
    722                 extant[i] = self.do_create(pre="aa")
    723         finally:
    724             for i in extant:
    725                 if(isinstance(i, str)):
    726                     os.rmdir(i)
    727 
    728     def test_choose_directory(self):
    729         # mkdtemp can create directories in a user-selected directory
    730         dir = tempfile.mkdtemp()
    731         try:
    732             os.rmdir(self.do_create(dir=dir))
    733         finally:
    734             os.rmdir(dir)
    735 
    736     @unittest.skipUnless(has_stat, 'os.stat not available')
    737     def test_mode(self):
    738         # mkdtemp creates directories with the proper mode
    739 
    740         dir = self.do_create()
    741         try:
    742             mode = stat.S_IMODE(os.stat(dir).st_mode)
    743             mode &= 0o777 # Mask off sticky bits inherited from /tmp
    744             expected = 0o700
    745             if sys.platform == 'win32':
    746                 # There's no distinction among 'user', 'group' and 'world';
    747                 # replicate the 'user' bits.
    748                 user = expected >> 6
    749                 expected = user * (1 + 8 + 64)
    750             self.assertEqual(mode, expected)
    751         finally:
    752             os.rmdir(dir)
    753 
    754     def test_collision_with_existing_file(self):
    755         # mkdtemp tries another name when a file with
    756         # the chosen name already exists
    757         with _inside_empty_temp_dir(), \
    758              _mock_candidate_names('aaa', 'aaa', 'bbb'):
    759             file = tempfile.NamedTemporaryFile(delete=False)
    760             file.close()
    761             self.assertTrue(file.name.endswith('aaa'))
    762             dir = tempfile.mkdtemp()
    763             self.assertTrue(dir.endswith('bbb'))
    764 
    765     def test_collision_with_existing_directory(self):
    766         # mkdtemp tries another name when a directory with
    767         # the chosen name already exists
    768         with _inside_empty_temp_dir(), \
    769              _mock_candidate_names('aaa', 'aaa', 'bbb'):
    770             dir1 = tempfile.mkdtemp()
    771             self.assertTrue(dir1.endswith('aaa'))
    772             dir2 = tempfile.mkdtemp()
    773             self.assertTrue(dir2.endswith('bbb'))
    774 
    775 
    776 class TestMktemp(BaseTestCase):
    777     """Test mktemp()."""
    778 
    779     # For safety, all use of mktemp must occur in a private directory.
    780     # We must also suppress the RuntimeWarning it generates.
    781     def setUp(self):
    782         self.dir = tempfile.mkdtemp()
    783         super().setUp()
    784 
    785     def tearDown(self):
    786         if self.dir:
    787             os.rmdir(self.dir)
    788             self.dir = None
    789         super().tearDown()
    790 
    791     class mktemped:
    792         _unlink = os.unlink
    793         _bflags = tempfile._bin_openflags
    794 
    795         def __init__(self, dir, pre, suf):
    796             self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
    797             # Create the file.  This will raise an exception if it's
    798             # mysteriously appeared in the meanwhile.
    799             os.close(os.open(self.name, self._bflags, 0o600))
    800 
    801         def __del__(self):
    802             self._unlink(self.name)
    803 
    804     def do_create(self, pre="", suf=""):
    805         file = self.mktemped(self.dir, pre, suf)
    806 
    807         self.nameCheck(file.name, self.dir, pre, suf)
    808         return file
    809 
    810     def test_basic(self):
    811         # mktemp can choose usable file names
    812         self.do_create()
    813         self.do_create(pre="a")
    814         self.do_create(suf="b")
    815         self.do_create(pre="a", suf="b")
    816         self.do_create(pre="aa", suf=".txt")
    817 
    818     def test_many(self):
    819         # mktemp can choose many usable file names (stochastic)
    820         extant = list(range(TEST_FILES))
    821         for i in extant:
    822             extant[i] = self.do_create(pre="aa")
    823 
    824 ##     def test_warning(self):
    825 ##         # mktemp issues a warning when used
    826 ##         warnings.filterwarnings("error",
    827 ##                                 category=RuntimeWarning,
    828 ##                                 message="mktemp")
    829 ##         self.assertRaises(RuntimeWarning,
    830 ##                           tempfile.mktemp, dir=self.dir)
    831 
    832 
    833 # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
    834 
    835 
    836 class TestNamedTemporaryFile(BaseTestCase):
    837     """Test NamedTemporaryFile()."""
    838 
    839     def do_create(self, dir=None, pre="", suf="", delete=True):
    840         if dir is None:
    841             dir = tempfile.gettempdir()
    842         file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
    843                                            delete=delete)
    844 
    845         self.nameCheck(file.name, dir, pre, suf)
    846         return file
    847 
    848 
    849     def test_basic(self):
    850         # NamedTemporaryFile can create files
    851         self.do_create()
    852         self.do_create(pre="a")
    853         self.do_create(suf="b")
    854         self.do_create(pre="a", suf="b")
    855         self.do_create(pre="aa", suf=".txt")
    856 
    857     def test_method_lookup(self):
    858         # Issue #18879: Looking up a temporary file method should keep it
    859         # alive long enough.
    860         f = self.do_create()
    861         wr = weakref.ref(f)
    862         write = f.write
    863         write2 = f.write
    864         del f
    865         write(b'foo')
    866         del write
    867         write2(b'bar')
    868         del write2
    869         if support.check_impl_detail(cpython=True):
    870             # No reference cycle was created.
    871             self.assertIsNone(wr())
    872 
    873     def test_iter(self):
    874         # Issue #23700: getting iterator from a temporary file should keep
    875         # it alive as long as it's being iterated over
    876         lines = [b'spam\n', b'eggs\n', b'beans\n']
    877         def make_file():
    878             f = tempfile.NamedTemporaryFile(mode='w+b')
    879             f.write(b''.join(lines))
    880             f.seek(0)
    881             return f
    882         for i, l in enumerate(make_file()):
    883             self.assertEqual(l, lines[i])
    884         self.assertEqual(i, len(lines) - 1)
    885 
    886     def test_creates_named(self):
    887         # NamedTemporaryFile creates files with names
    888         f = tempfile.NamedTemporaryFile()
    889         self.assertTrue(os.path.exists(f.name),
    890                         "NamedTemporaryFile %s does not exist" % f.name)
    891 
    892     def test_del_on_close(self):
    893         # A NamedTemporaryFile is deleted when closed
    894         dir = tempfile.mkdtemp()
    895         try:
    896             f = tempfile.NamedTemporaryFile(dir=dir)
    897             f.write(b'blat')
    898             f.close()
    899             self.assertFalse(os.path.exists(f.name),
    900                         "NamedTemporaryFile %s exists after close" % f.name)
    901         finally:
    902             os.rmdir(dir)
    903 
    904     def test_dis_del_on_close(self):
    905         # Tests that delete-on-close can be disabled
    906         dir = tempfile.mkdtemp()
    907         tmp = None
    908         try:
    909             f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
    910             tmp = f.name
    911             f.write(b'blat')
    912             f.close()
    913             self.assertTrue(os.path.exists(f.name),
    914                         "NamedTemporaryFile %s missing after close" % f.name)
    915         finally:
    916             if tmp is not None:
    917                 os.unlink(tmp)
    918             os.rmdir(dir)
    919 
    920     def test_multiple_close(self):
    921         # A NamedTemporaryFile can be closed many times without error
    922         f = tempfile.NamedTemporaryFile()
    923         f.write(b'abc\n')
    924         f.close()
    925         f.close()
    926         f.close()
    927 
    928     def test_context_manager(self):
    929         # A NamedTemporaryFile can be used as a context manager
    930         with tempfile.NamedTemporaryFile() as f:
    931             self.assertTrue(os.path.exists(f.name))
    932         self.assertFalse(os.path.exists(f.name))
    933         def use_closed():
    934             with f:
    935                 pass
    936         self.assertRaises(ValueError, use_closed)
    937 
    938     def test_no_leak_fd(self):
    939         # Issue #21058: don't leak file descriptor when io.open() fails
    940         closed = []
    941         os_close = os.close
    942         def close(fd):
    943             closed.append(fd)
    944             os_close(fd)
    945 
    946         with mock.patch('os.close', side_effect=close):
    947             with mock.patch('io.open', side_effect=ValueError):
    948                 self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
    949                 self.assertEqual(len(closed), 1)
    950 
    951     def test_bad_mode(self):
    952         dir = tempfile.mkdtemp()
    953         self.addCleanup(support.rmtree, dir)
    954         with self.assertRaises(ValueError):
    955             tempfile.NamedTemporaryFile(mode='wr', dir=dir)
    956         with self.assertRaises(TypeError):
    957             tempfile.NamedTemporaryFile(mode=2, dir=dir)
    958         self.assertEqual(os.listdir(dir), [])
    959 
    960     # How to test the mode and bufsize parameters?
    961 
    962 class TestSpooledTemporaryFile(BaseTestCase):
    963     """Test SpooledTemporaryFile()."""
    964 
    965     def do_create(self, max_size=0, dir=None, pre="", suf=""):
    966         if dir is None:
    967             dir = tempfile.gettempdir()
    968         file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
    969 
    970         return file
    971 
    972 
    973     def test_basic(self):
    974         # SpooledTemporaryFile can create files
    975         f = self.do_create()
    976         self.assertFalse(f._rolled)
    977         f = self.do_create(max_size=100, pre="a", suf=".txt")
    978         self.assertFalse(f._rolled)
    979 
    980     def test_del_on_close(self):
    981         # A SpooledTemporaryFile is deleted when closed
    982         dir = tempfile.mkdtemp()
    983         try:
    984             f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
    985             self.assertFalse(f._rolled)
    986             f.write(b'blat ' * 5)
    987             self.assertTrue(f._rolled)
    988             filename = f.name
    989             f.close()
    990             self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
    991                         "SpooledTemporaryFile %s exists after close" % filename)
    992         finally:
    993             os.rmdir(dir)
    994 
    995     def test_rewrite_small(self):
    996         # A SpooledTemporaryFile can be written to multiple within the max_size
    997         f = self.do_create(max_size=30)
    998         self.assertFalse(f._rolled)
    999         for i in range(5):
   1000             f.seek(0, 0)
   1001             f.write(b'x' * 20)
   1002         self.assertFalse(f._rolled)
   1003 
   1004     def test_write_sequential(self):
   1005         # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
   1006         # over afterward
   1007         f = self.do_create(max_size=30)
   1008         self.assertFalse(f._rolled)
   1009         f.write(b'x' * 20)
   1010         self.assertFalse(f._rolled)
   1011         f.write(b'x' * 10)
   1012         self.assertFalse(f._rolled)
   1013         f.write(b'x')
   1014         self.assertTrue(f._rolled)
   1015 
   1016     def test_writelines(self):
   1017         # Verify writelines with a SpooledTemporaryFile
   1018         f = self.do_create()
   1019         f.writelines((b'x', b'y', b'z'))
   1020         f.seek(0)
   1021         buf = f.read()
   1022         self.assertEqual(buf, b'xyz')
   1023 
   1024     def test_writelines_sequential(self):
   1025         # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
   1026         # over afterward
   1027         f = self.do_create(max_size=35)
   1028         f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
   1029         self.assertFalse(f._rolled)
   1030         f.write(b'x')
   1031         self.assertTrue(f._rolled)
   1032 
   1033     def test_sparse(self):
   1034         # A SpooledTemporaryFile that is written late in the file will extend
   1035         # when that occurs
   1036         f = self.do_create(max_size=30)
   1037         self.assertFalse(f._rolled)
   1038         f.seek(100, 0)
   1039         self.assertFalse(f._rolled)
   1040         f.write(b'x')
   1041         self.assertTrue(f._rolled)
   1042 
   1043     def test_fileno(self):
   1044         # A SpooledTemporaryFile should roll over to a real file on fileno()
   1045         f = self.do_create(max_size=30)
   1046         self.assertFalse(f._rolled)
   1047         self.assertTrue(f.fileno() > 0)
   1048         self.assertTrue(f._rolled)
   1049 
   1050     def test_multiple_close_before_rollover(self):
   1051         # A SpooledTemporaryFile can be closed many times without error
   1052         f = tempfile.SpooledTemporaryFile()
   1053         f.write(b'abc\n')
   1054         self.assertFalse(f._rolled)
   1055         f.close()
   1056         f.close()
   1057         f.close()
   1058 
   1059     def test_multiple_close_after_rollover(self):
   1060         # A SpooledTemporaryFile can be closed many times without error
   1061         f = tempfile.SpooledTemporaryFile(max_size=1)
   1062         f.write(b'abc\n')
   1063         self.assertTrue(f._rolled)
   1064         f.close()
   1065         f.close()
   1066         f.close()
   1067 
   1068     def test_bound_methods(self):
   1069         # It should be OK to steal a bound method from a SpooledTemporaryFile
   1070         # and use it independently; when the file rolls over, those bound
   1071         # methods should continue to function
   1072         f = self.do_create(max_size=30)
   1073         read = f.read
   1074         write = f.write
   1075         seek = f.seek
   1076 
   1077         write(b"a" * 35)
   1078         write(b"b" * 35)
   1079         seek(0, 0)
   1080         self.assertEqual(read(70), b'a'*35 + b'b'*35)
   1081 
   1082     def test_properties(self):
   1083         f = tempfile.SpooledTemporaryFile(max_size=10)
   1084         f.write(b'x' * 10)
   1085         self.assertFalse(f._rolled)
   1086         self.assertEqual(f.mode, 'w+b')
   1087         self.assertIsNone(f.name)
   1088         with self.assertRaises(AttributeError):
   1089             f.newlines
   1090         with self.assertRaises(AttributeError):
   1091             f.encoding
   1092 
   1093         f.write(b'x')
   1094         self.assertTrue(f._rolled)
   1095         self.assertEqual(f.mode, 'rb+')
   1096         self.assertIsNotNone(f.name)
   1097         with self.assertRaises(AttributeError):
   1098             f.newlines
   1099         with self.assertRaises(AttributeError):
   1100             f.encoding
   1101 
   1102     def test_text_mode(self):
   1103         # Creating a SpooledTemporaryFile with a text mode should produce
   1104         # a file object reading and writing (Unicode) text strings.
   1105         f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
   1106         f.write("abc\n")
   1107         f.seek(0)
   1108         self.assertEqual(f.read(), "abc\n")
   1109         f.write("def\n")
   1110         f.seek(0)
   1111         self.assertEqual(f.read(), "abc\ndef\n")
   1112         self.assertFalse(f._rolled)
   1113         self.assertEqual(f.mode, 'w+')
   1114         self.assertIsNone(f.name)
   1115         self.assertIsNone(f.newlines)
   1116         self.assertIsNone(f.encoding)
   1117 
   1118         f.write("xyzzy\n")
   1119         f.seek(0)
   1120         self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
   1121         # Check that Ctrl+Z doesn't truncate the file
   1122         f.write("foo\x1abar\n")
   1123         f.seek(0)
   1124         self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
   1125         self.assertTrue(f._rolled)
   1126         self.assertEqual(f.mode, 'w+')
   1127         self.assertIsNotNone(f.name)
   1128         self.assertEqual(f.newlines, os.linesep)
   1129         self.assertIsNotNone(f.encoding)
   1130 
   1131     def test_text_newline_and_encoding(self):
   1132         f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
   1133                                           newline='', encoding='utf-8')
   1134         f.write("\u039B\r\n")
   1135         f.seek(0)
   1136         self.assertEqual(f.read(), "\u039B\r\n")
   1137         self.assertFalse(f._rolled)
   1138         self.assertEqual(f.mode, 'w+')
   1139         self.assertIsNone(f.name)
   1140         self.assertIsNone(f.newlines)
   1141         self.assertIsNone(f.encoding)
   1142 
   1143         f.write("\u039B" * 20 + "\r\n")
   1144         f.seek(0)
   1145         self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
   1146         self.assertTrue(f._rolled)
   1147         self.assertEqual(f.mode, 'w+')
   1148         self.assertIsNotNone(f.name)
   1149         self.assertIsNotNone(f.newlines)
   1150         self.assertEqual(f.encoding, 'utf-8')
   1151 
   1152     def test_context_manager_before_rollover(self):
   1153         # A SpooledTemporaryFile can be used as a context manager
   1154         with tempfile.SpooledTemporaryFile(max_size=1) as f:
   1155             self.assertFalse(f._rolled)
   1156             self.assertFalse(f.closed)
   1157         self.assertTrue(f.closed)
   1158         def use_closed():
   1159             with f:
   1160                 pass
   1161         self.assertRaises(ValueError, use_closed)
   1162 
   1163     def test_context_manager_during_rollover(self):
   1164         # A SpooledTemporaryFile can be used as a context manager
   1165         with tempfile.SpooledTemporaryFile(max_size=1) as f:
   1166             self.assertFalse(f._rolled)
   1167             f.write(b'abc\n')
   1168             f.flush()
   1169             self.assertTrue(f._rolled)
   1170             self.assertFalse(f.closed)
   1171         self.assertTrue(f.closed)
   1172         def use_closed():
   1173             with f:
   1174                 pass
   1175         self.assertRaises(ValueError, use_closed)
   1176 
   1177     def test_context_manager_after_rollover(self):
   1178         # A SpooledTemporaryFile can be used as a context manager
   1179         f = tempfile.SpooledTemporaryFile(max_size=1)
   1180         f.write(b'abc\n')
   1181         f.flush()
   1182         self.assertTrue(f._rolled)
   1183         with f:
   1184             self.assertFalse(f.closed)
   1185         self.assertTrue(f.closed)
   1186         def use_closed():
   1187             with f:
   1188                 pass
   1189         self.assertRaises(ValueError, use_closed)
   1190 
   1191     def test_truncate_with_size_parameter(self):
   1192         # A SpooledTemporaryFile can be truncated to zero size
   1193         f = tempfile.SpooledTemporaryFile(max_size=10)
   1194         f.write(b'abcdefg\n')
   1195         f.seek(0)
   1196         f.truncate()
   1197         self.assertFalse(f._rolled)
   1198         self.assertEqual(f._file.getvalue(), b'')
   1199         # A SpooledTemporaryFile can be truncated to a specific size
   1200         f = tempfile.SpooledTemporaryFile(max_size=10)
   1201         f.write(b'abcdefg\n')
   1202         f.truncate(4)
   1203         self.assertFalse(f._rolled)
   1204         self.assertEqual(f._file.getvalue(), b'abcd')
   1205         # A SpooledTemporaryFile rolls over if truncated to large size
   1206         f = tempfile.SpooledTemporaryFile(max_size=10)
   1207         f.write(b'abcdefg\n')
   1208         f.truncate(20)
   1209         self.assertTrue(f._rolled)
   1210         if has_stat:
   1211             self.assertEqual(os.fstat(f.fileno()).st_size, 20)
   1212 
   1213 
   1214 if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
   1215 
   1216     class TestTemporaryFile(BaseTestCase):
   1217         """Test TemporaryFile()."""
   1218 
   1219         def test_basic(self):
   1220             # TemporaryFile can create files
   1221             # No point in testing the name params - the file has no name.
   1222             tempfile.TemporaryFile()
   1223 
   1224         def test_has_no_name(self):
   1225             # TemporaryFile creates files with no names (on this system)
   1226             dir = tempfile.mkdtemp()
   1227             f = tempfile.TemporaryFile(dir=dir)
   1228             f.write(b'blat')
   1229 
   1230             # Sneaky: because this file has no name, it should not prevent
   1231             # us from removing the directory it was created in.
   1232             try:
   1233                 os.rmdir(dir)
   1234             except:
   1235                 # cleanup
   1236                 f.close()
   1237                 os.rmdir(dir)
   1238                 raise
   1239 
   1240         def test_multiple_close(self):
   1241             # A TemporaryFile can be closed many times without error
   1242             f = tempfile.TemporaryFile()
   1243             f.write(b'abc\n')
   1244             f.close()
   1245             f.close()
   1246             f.close()
   1247 
   1248         # How to test the mode and bufsize parameters?
   1249         def test_mode_and_encoding(self):
   1250 
   1251             def roundtrip(input, *args, **kwargs):
   1252                 with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
   1253                     fileobj.write(input)
   1254                     fileobj.seek(0)
   1255                     self.assertEqual(input, fileobj.read())
   1256 
   1257             roundtrip(b"1234", "w+b")
   1258             roundtrip("abdc\n", "w+")
   1259             roundtrip("\u039B", "w+", encoding="utf-16")
   1260             roundtrip("foo\r\n", "w+", newline="")
   1261 
   1262         def test_no_leak_fd(self):
   1263             # Issue #21058: don't leak file descriptor when io.open() fails
   1264             closed = []
   1265             os_close = os.close
   1266             def close(fd):
   1267                 closed.append(fd)
   1268                 os_close(fd)
   1269 
   1270             with mock.patch('os.close', side_effect=close):
   1271                 with mock.patch('io.open', side_effect=ValueError):
   1272                     self.assertRaises(ValueError, tempfile.TemporaryFile)
   1273                     self.assertEqual(len(closed), 1)
   1274 
   1275 
   1276 
   1277 # Helper for test_del_on_shutdown
   1278 class NulledModules:
   1279     def __init__(self, *modules):
   1280         self.refs = [mod.__dict__ for mod in modules]
   1281         self.contents = [ref.copy() for ref in self.refs]
   1282 
   1283     def __enter__(self):
   1284         for d in self.refs:
   1285             for key in d:
   1286                 d[key] = None
   1287 
   1288     def __exit__(self, *exc_info):
   1289         for d, c in zip(self.refs, self.contents):
   1290             d.clear()
   1291             d.update(c)
   1292 
   1293 class TestTemporaryDirectory(BaseTestCase):
   1294     """Test TemporaryDirectory()."""
   1295 
   1296     def do_create(self, dir=None, pre="", suf="", recurse=1):
   1297         if dir is None:
   1298             dir = tempfile.gettempdir()
   1299         tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
   1300         self.nameCheck(tmp.name, dir, pre, suf)
   1301         # Create a subdirectory and some files
   1302         if recurse:
   1303             d1 = self.do_create(tmp.name, pre, suf, recurse-1)
   1304             d1.name = None
   1305         with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
   1306             f.write(b"Hello world!")
   1307         return tmp
   1308 
   1309     def test_mkdtemp_failure(self):
   1310         # Check no additional exception if mkdtemp fails
   1311         # Previously would raise AttributeError instead
   1312         # (noted as part of Issue #10188)
   1313         with tempfile.TemporaryDirectory() as nonexistent:
   1314             pass
   1315         with self.assertRaises(FileNotFoundError) as cm:
   1316             tempfile.TemporaryDirectory(dir=nonexistent)
   1317         self.assertEqual(cm.exception.errno, errno.ENOENT)
   1318 
   1319     def test_explicit_cleanup(self):
   1320         # A TemporaryDirectory is deleted when cleaned up
   1321         dir = tempfile.mkdtemp()
   1322         try:
   1323             d = self.do_create(dir=dir)
   1324             self.assertTrue(os.path.exists(d.name),
   1325                             "TemporaryDirectory %s does not exist" % d.name)
   1326             d.cleanup()
   1327             self.assertFalse(os.path.exists(d.name),
   1328                         "TemporaryDirectory %s exists after cleanup" % d.name)
   1329         finally:
   1330             os.rmdir(dir)
   1331 
   1332     @support.skip_unless_symlink
   1333     def test_cleanup_with_symlink_to_a_directory(self):
   1334         # cleanup() should not follow symlinks to directories (issue #12464)
   1335         d1 = self.do_create()
   1336         d2 = self.do_create(recurse=0)
   1337 
   1338         # Symlink d1/foo -> d2
   1339         os.symlink(d2.name, os.path.join(d1.name, "foo"))
   1340 
   1341         # This call to cleanup() should not follow the "foo" symlink
   1342         d1.cleanup()
   1343 
   1344         self.assertFalse(os.path.exists(d1.name),
   1345                          "TemporaryDirectory %s exists after cleanup" % d1.name)
   1346         self.assertTrue(os.path.exists(d2.name),
   1347                         "Directory pointed to by a symlink was deleted")
   1348         self.assertEqual(os.listdir(d2.name), ['test.txt'],
   1349                          "Contents of the directory pointed to by a symlink "
   1350                          "were deleted")
   1351         d2.cleanup()
   1352 
   1353     @support.cpython_only
   1354     def test_del_on_collection(self):
   1355         # A TemporaryDirectory is deleted when garbage collected
   1356         dir = tempfile.mkdtemp()
   1357         try:
   1358             d = self.do_create(dir=dir)
   1359             name = d.name
   1360             del d # Rely on refcounting to invoke __del__
   1361             self.assertFalse(os.path.exists(name),
   1362                         "TemporaryDirectory %s exists after __del__" % name)
   1363         finally:
   1364             os.rmdir(dir)
   1365 
   1366     def test_del_on_shutdown(self):
   1367         # A TemporaryDirectory may be cleaned up during shutdown
   1368         with self.do_create() as dir:
   1369             for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
   1370                 code = """if True:
   1371                     import builtins
   1372                     import os
   1373                     import shutil
   1374                     import sys
   1375                     import tempfile
   1376                     import warnings
   1377 
   1378                     tmp = tempfile.TemporaryDirectory(dir={dir!r})
   1379                     sys.stdout.buffer.write(tmp.name.encode())
   1380 
   1381                     tmp2 = os.path.join(tmp.name, 'test_dir')
   1382                     os.mkdir(tmp2)
   1383                     with open(os.path.join(tmp2, "test.txt"), "w") as f:
   1384                         f.write("Hello world!")
   1385 
   1386                     {mod}.tmp = tmp
   1387 
   1388                     warnings.filterwarnings("always", category=ResourceWarning)
   1389                     """.format(dir=dir, mod=mod)
   1390                 rc, out, err = script_helper.assert_python_ok("-c", code)
   1391                 tmp_name = out.decode().strip()
   1392                 self.assertFalse(os.path.exists(tmp_name),
   1393                             "TemporaryDirectory %s exists after cleanup" % tmp_name)
   1394                 err = err.decode('utf-8', 'backslashreplace')
   1395                 self.assertNotIn("Exception ", err)
   1396                 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
   1397 
   1398     def test_exit_on_shutdown(self):
   1399         # Issue #22427
   1400         with self.do_create() as dir:
   1401             code = """if True:
   1402                 import sys
   1403                 import tempfile
   1404                 import warnings
   1405 
   1406                 def generator():
   1407                     with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
   1408                         yield tmp
   1409                 g = generator()
   1410                 sys.stdout.buffer.write(next(g).encode())
   1411 
   1412                 warnings.filterwarnings("always", category=ResourceWarning)
   1413                 """.format(dir=dir)
   1414             rc, out, err = script_helper.assert_python_ok("-c", code)
   1415             tmp_name = out.decode().strip()
   1416             self.assertFalse(os.path.exists(tmp_name),
   1417                         "TemporaryDirectory %s exists after cleanup" % tmp_name)
   1418             err = err.decode('utf-8', 'backslashreplace')
   1419             self.assertNotIn("Exception ", err)
   1420             self.assertIn("ResourceWarning: Implicitly cleaning up", err)
   1421 
   1422     def test_warnings_on_cleanup(self):
   1423         # ResourceWarning will be triggered by __del__
   1424         with self.do_create() as dir:
   1425             d = self.do_create(dir=dir, recurse=3)
   1426             name = d.name
   1427 
   1428             # Check for the resource warning
   1429             with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
   1430                 warnings.filterwarnings("always", category=ResourceWarning)
   1431                 del d
   1432                 support.gc_collect()
   1433             self.assertFalse(os.path.exists(name),
   1434                         "TemporaryDirectory %s exists after __del__" % name)
   1435 
   1436     def test_multiple_close(self):
   1437         # Can be cleaned-up many times without error
   1438         d = self.do_create()
   1439         d.cleanup()
   1440         d.cleanup()
   1441         d.cleanup()
   1442 
   1443     def test_context_manager(self):
   1444         # Can be used as a context manager
   1445         d = self.do_create()
   1446         with d as name:
   1447             self.assertTrue(os.path.exists(name))
   1448             self.assertEqual(name, d.name)
   1449         self.assertFalse(os.path.exists(name))
   1450 
   1451 
   1452 if __name__ == "__main__":
   1453     unittest.main()
   1454