Home | History | Annotate | Download | only in test
      1 """
      2 Create and delete FILES_PER_THREAD temp files (via tempfile.TemporaryFile)
      3 in each of NUM_THREADS threads, recording the number of successes and
      4 failures.  A failure is a bug in tempfile, and may be due to:
      5 
      6 + Trying to create more than one tempfile with the same name.
      7 + Trying to delete a tempfile that doesn't still exist.
      8 + Something we've never seen before.
      9 
     10 By default, NUM_THREADS == 20 and FILES_PER_THREAD == 50.  This is enough to
     11 create about 150 failures per run under Win98SE in 2.0, and runs pretty
     12 quickly. Guido reports needing to boost FILES_PER_THREAD to 500 before
     13 provoking a 2.0 failure under Linux.
     14 """
     15 
     16 NUM_THREADS = 20
     17 FILES_PER_THREAD = 50
     18 
     19 import tempfile
     20 
     21 from test.support import start_threads, import_module
     22 import unittest
     23 import io
     24 import threading
     25 from traceback import print_exc
     26 
     27 startEvent = threading.Event()
     28 
     29 class TempFileGreedy(threading.Thread):
     30     error_count = 0
     31     ok_count = 0
     32 
     33     def run(self):
     34         self.errors = io.StringIO()
     35         startEvent.wait()
     36         for i in range(FILES_PER_THREAD):
     37             try:
     38                 f = tempfile.TemporaryFile("w+b")
     39                 f.close()
     40             except:
     41                 self.error_count += 1
     42                 print_exc(file=self.errors)
     43             else:
     44                 self.ok_count += 1
     45 
     46 
     47 class ThreadedTempFileTest(unittest.TestCase):
     48     def test_main(self):
     49         threads = [TempFileGreedy() for i in range(NUM_THREADS)]
     50         with start_threads(threads, startEvent.set):
     51             pass
     52         ok = sum(t.ok_count for t in threads)
     53         errors = [str(t.name) + str(t.errors.getvalue())
     54                   for t in threads if t.error_count]
     55 
     56         msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
     57             '\n'.join(errors))
     58         self.assertEqual(errors, [], msg)
     59         self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
     60 
     61 if __name__ == "__main__":
     62     unittest.main()
     63