Home | History | Annotate | Download | only in test
      1 import test.support
      2 
      3 # Skip tests if _multiprocessing wasn't built.
      4 test.support.import_module('_multiprocessing')
      5 # Skip tests if sem_open implementation is broken.
      6 test.support.import_module('multiprocessing.synchronize')
      7 # import threading after _multiprocessing to raise a more relevant error
      8 # message: "No module named _multiprocessing". _multiprocessing is not compiled
      9 # without thread support.
     10 test.support.import_module('threading')
     11 
     12 from test.support.script_helper import assert_python_ok
     13 
     14 import os
     15 import sys
     16 import threading
     17 import time
     18 import unittest
     19 import weakref
     20 
     21 from concurrent import futures
     22 from concurrent.futures._base import (
     23     PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
     24 from concurrent.futures.process import BrokenProcessPool
     25 
     26 
     27 def create_future(state=PENDING, exception=None, result=None):
     28     f = Future()
     29     f._state = state
     30     f._exception = exception
     31     f._result = result
     32     return f
     33 
     34 
     35 PENDING_FUTURE = create_future(state=PENDING)
     36 RUNNING_FUTURE = create_future(state=RUNNING)
     37 CANCELLED_FUTURE = create_future(state=CANCELLED)
     38 CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
     39 EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
     40 SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
     41 
     42 
     43 def mul(x, y):
     44     return x * y
     45 
     46 
     47 def sleep_and_raise(t):
     48     time.sleep(t)
     49     raise Exception('this is an exception')
     50 
     51 def sleep_and_print(t, msg):
     52     time.sleep(t)
     53     print(msg)
     54     sys.stdout.flush()
     55 
     56 
     57 class MyObject(object):
     58     def my_method(self):
     59         pass
     60 
     61 
     62 class ExecutorMixin:
     63     worker_count = 5
     64 
     65     def setUp(self):
     66         self.t1 = time.time()
     67         try:
     68             self.executor = self.executor_type(max_workers=self.worker_count)
     69         except NotImplementedError as e:
     70             self.skipTest(str(e))
     71         self._prime_executor()
     72 
     73     def tearDown(self):
     74         self.executor.shutdown(wait=True)
     75         dt = time.time() - self.t1
     76         if test.support.verbose:
     77             print("%.2fs" % dt, end=' ')
     78         self.assertLess(dt, 60, "synchronization issue: test lasted too long")
     79 
     80     def _prime_executor(self):
     81         # Make sure that the executor is ready to do work before running the
     82         # tests. This should reduce the probability of timeouts in the tests.
     83         futures = [self.executor.submit(time.sleep, 0.1)
     84                    for _ in range(self.worker_count)]
     85 
     86         for f in futures:
     87             f.result()
     88 
     89 
     90 class ThreadPoolMixin(ExecutorMixin):
     91     executor_type = futures.ThreadPoolExecutor
     92 
     93 
     94 class ProcessPoolMixin(ExecutorMixin):
     95     executor_type = futures.ProcessPoolExecutor
     96 
     97 
     98 class ExecutorShutdownTest:
     99     def test_run_after_shutdown(self):
    100         self.executor.shutdown()
    101         self.assertRaises(RuntimeError,
    102                           self.executor.submit,
    103                           pow, 2, 5)
    104 
    105     def test_interpreter_shutdown(self):
    106         # Test the atexit hook for shutdown of worker threads and processes
    107         rc, out, err = assert_python_ok('-c', """if 1:
    108             from concurrent.futures import {executor_type}
    109             from time import sleep
    110             from test.test_concurrent_futures import sleep_and_print
    111             t = {executor_type}(5)
    112             t.submit(sleep_and_print, 1.0, "apple")
    113             """.format(executor_type=self.executor_type.__name__))
    114         # Errors in atexit hooks don't change the process exit code, check
    115         # stderr manually.
    116         self.assertFalse(err)
    117         self.assertEqual(out.strip(), b"apple")
    118 
    119     def test_hang_issue12364(self):
    120         fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
    121         self.executor.shutdown()
    122         for f in fs:
    123             f.result()
    124 
    125 
    126 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
    127     def _prime_executor(self):
    128         pass
    129 
    130     def test_threads_terminate(self):
    131         self.executor.submit(mul, 21, 2)
    132         self.executor.submit(mul, 6, 7)
    133         self.executor.submit(mul, 3, 14)
    134         self.assertEqual(len(self.executor._threads), 3)
    135         self.executor.shutdown()
    136         for t in self.executor._threads:
    137             t.join()
    138 
    139     def test_context_manager_shutdown(self):
    140         with futures.ThreadPoolExecutor(max_workers=5) as e:
    141             executor = e
    142             self.assertEqual(list(e.map(abs, range(-5, 5))),
    143                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
    144 
    145         for t in executor._threads:
    146             t.join()
    147 
    148     def test_del_shutdown(self):
    149         executor = futures.ThreadPoolExecutor(max_workers=5)
    150         executor.map(abs, range(-5, 5))
    151         threads = executor._threads
    152         del executor
    153 
    154         for t in threads:
    155             t.join()
    156 
    157     def test_thread_names_assigned(self):
    158         executor = futures.ThreadPoolExecutor(
    159             max_workers=5, thread_name_prefix='SpecialPool')
    160         executor.map(abs, range(-5, 5))
    161         threads = executor._threads
    162         del executor
    163 
    164         for t in threads:
    165             self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
    166             t.join()
    167 
    168     def test_thread_names_default(self):
    169         executor = futures.ThreadPoolExecutor(max_workers=5)
    170         executor.map(abs, range(-5, 5))
    171         threads = executor._threads
    172         del executor
    173 
    174         for t in threads:
    175             # We don't particularly care what the default name is, just that
    176             # it has a default name implying that it is a ThreadPoolExecutor
    177             # followed by what looks like a thread number.
    178             self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
    179             t.join()
    180 
    181 
    182 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
    183     def _prime_executor(self):
    184         pass
    185 
    186     def test_processes_terminate(self):
    187         self.executor.submit(mul, 21, 2)
    188         self.executor.submit(mul, 6, 7)
    189         self.executor.submit(mul, 3, 14)
    190         self.assertEqual(len(self.executor._processes), 5)
    191         processes = self.executor._processes
    192         self.executor.shutdown()
    193 
    194         for p in processes.values():
    195             p.join()
    196 
    197     def test_context_manager_shutdown(self):
    198         with futures.ProcessPoolExecutor(max_workers=5) as e:
    199             processes = e._processes
    200             self.assertEqual(list(e.map(abs, range(-5, 5))),
    201                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
    202 
    203         for p in processes.values():
    204             p.join()
    205 
    206     def test_del_shutdown(self):
    207         executor = futures.ProcessPoolExecutor(max_workers=5)
    208         list(executor.map(abs, range(-5, 5)))
    209         queue_management_thread = executor._queue_management_thread
    210         processes = executor._processes
    211         del executor
    212 
    213         queue_management_thread.join()
    214         for p in processes.values():
    215             p.join()
    216 
    217 
    218 class WaitTests:
    219 
    220     def test_first_completed(self):
    221         future1 = self.executor.submit(mul, 21, 2)
    222         future2 = self.executor.submit(time.sleep, 1.5)
    223 
    224         done, not_done = futures.wait(
    225                 [CANCELLED_FUTURE, future1, future2],
    226                  return_when=futures.FIRST_COMPLETED)
    227 
    228         self.assertEqual(set([future1]), done)
    229         self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
    230 
    231     def test_first_completed_some_already_completed(self):
    232         future1 = self.executor.submit(time.sleep, 1.5)
    233 
    234         finished, pending = futures.wait(
    235                  [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
    236                  return_when=futures.FIRST_COMPLETED)
    237 
    238         self.assertEqual(
    239                 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
    240                 finished)
    241         self.assertEqual(set([future1]), pending)
    242 
    243     def test_first_exception(self):
    244         future1 = self.executor.submit(mul, 2, 21)
    245         future2 = self.executor.submit(sleep_and_raise, 1.5)
    246         future3 = self.executor.submit(time.sleep, 3)
    247 
    248         finished, pending = futures.wait(
    249                 [future1, future2, future3],
    250                 return_when=futures.FIRST_EXCEPTION)
    251 
    252         self.assertEqual(set([future1, future2]), finished)
    253         self.assertEqual(set([future3]), pending)
    254 
    255     def test_first_exception_some_already_complete(self):
    256         future1 = self.executor.submit(divmod, 21, 0)
    257         future2 = self.executor.submit(time.sleep, 1.5)
    258 
    259         finished, pending = futures.wait(
    260                 [SUCCESSFUL_FUTURE,
    261                  CANCELLED_FUTURE,
    262                  CANCELLED_AND_NOTIFIED_FUTURE,
    263                  future1, future2],
    264                 return_when=futures.FIRST_EXCEPTION)
    265 
    266         self.assertEqual(set([SUCCESSFUL_FUTURE,
    267                               CANCELLED_AND_NOTIFIED_FUTURE,
    268                               future1]), finished)
    269         self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
    270 
    271     def test_first_exception_one_already_failed(self):
    272         future1 = self.executor.submit(time.sleep, 2)
    273 
    274         finished, pending = futures.wait(
    275                  [EXCEPTION_FUTURE, future1],
    276                  return_when=futures.FIRST_EXCEPTION)
    277 
    278         self.assertEqual(set([EXCEPTION_FUTURE]), finished)
    279         self.assertEqual(set([future1]), pending)
    280 
    281     def test_all_completed(self):
    282         future1 = self.executor.submit(divmod, 2, 0)
    283         future2 = self.executor.submit(mul, 2, 21)
    284 
    285         finished, pending = futures.wait(
    286                 [SUCCESSFUL_FUTURE,
    287                  CANCELLED_AND_NOTIFIED_FUTURE,
    288                  EXCEPTION_FUTURE,
    289                  future1,
    290                  future2],
    291                 return_when=futures.ALL_COMPLETED)
    292 
    293         self.assertEqual(set([SUCCESSFUL_FUTURE,
    294                               CANCELLED_AND_NOTIFIED_FUTURE,
    295                               EXCEPTION_FUTURE,
    296                               future1,
    297                               future2]), finished)
    298         self.assertEqual(set(), pending)
    299 
    300     def test_timeout(self):
    301         future1 = self.executor.submit(mul, 6, 7)
    302         future2 = self.executor.submit(time.sleep, 6)
    303 
    304         finished, pending = futures.wait(
    305                 [CANCELLED_AND_NOTIFIED_FUTURE,
    306                  EXCEPTION_FUTURE,
    307                  SUCCESSFUL_FUTURE,
    308                  future1, future2],
    309                 timeout=5,
    310                 return_when=futures.ALL_COMPLETED)
    311 
    312         self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
    313                               EXCEPTION_FUTURE,
    314                               SUCCESSFUL_FUTURE,
    315                               future1]), finished)
    316         self.assertEqual(set([future2]), pending)
    317 
    318 
    319 class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
    320 
    321     def test_pending_calls_race(self):
    322         # Issue #14406: multi-threaded race condition when waiting on all
    323         # futures.
    324         event = threading.Event()
    325         def future_func():
    326             event.wait()
    327         oldswitchinterval = sys.getswitchinterval()
    328         sys.setswitchinterval(1e-6)
    329         try:
    330             fs = {self.executor.submit(future_func) for i in range(100)}
    331             event.set()
    332             futures.wait(fs, return_when=futures.ALL_COMPLETED)
    333         finally:
    334             sys.setswitchinterval(oldswitchinterval)
    335 
    336 
    337 class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
    338     pass
    339 
    340 
    341 class AsCompletedTests:
    342     # TODO(brian (at] sweetapp.com): Should have a test with a non-zero timeout.
    343     def test_no_timeout(self):
    344         future1 = self.executor.submit(mul, 2, 21)
    345         future2 = self.executor.submit(mul, 7, 6)
    346 
    347         completed = set(futures.as_completed(
    348                 [CANCELLED_AND_NOTIFIED_FUTURE,
    349                  EXCEPTION_FUTURE,
    350                  SUCCESSFUL_FUTURE,
    351                  future1, future2]))
    352         self.assertEqual(set(
    353                 [CANCELLED_AND_NOTIFIED_FUTURE,
    354                  EXCEPTION_FUTURE,
    355                  SUCCESSFUL_FUTURE,
    356                  future1, future2]),
    357                 completed)
    358 
    359     def test_zero_timeout(self):
    360         future1 = self.executor.submit(time.sleep, 2)
    361         completed_futures = set()
    362         try:
    363             for future in futures.as_completed(
    364                     [CANCELLED_AND_NOTIFIED_FUTURE,
    365                      EXCEPTION_FUTURE,
    366                      SUCCESSFUL_FUTURE,
    367                      future1],
    368                     timeout=0):
    369                 completed_futures.add(future)
    370         except futures.TimeoutError:
    371             pass
    372 
    373         self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
    374                               EXCEPTION_FUTURE,
    375                               SUCCESSFUL_FUTURE]),
    376                          completed_futures)
    377 
    378     def test_duplicate_futures(self):
    379         # Issue 20367. Duplicate futures should not raise exceptions or give
    380         # duplicate responses.
    381         future1 = self.executor.submit(time.sleep, 2)
    382         completed = [f for f in futures.as_completed([future1,future1])]
    383         self.assertEqual(len(completed), 1)
    384 
    385 
    386 class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
    387     pass
    388 
    389 
    390 class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
    391     pass
    392 
    393 
    394 class ExecutorTest:
    395     # Executor.shutdown() and context manager usage is tested by
    396     # ExecutorShutdownTest.
    397     def test_submit(self):
    398         future = self.executor.submit(pow, 2, 8)
    399         self.assertEqual(256, future.result())
    400 
    401     def test_submit_keyword(self):
    402         future = self.executor.submit(mul, 2, y=8)
    403         self.assertEqual(16, future.result())
    404 
    405     def test_map(self):
    406         self.assertEqual(
    407                 list(self.executor.map(pow, range(10), range(10))),
    408                 list(map(pow, range(10), range(10))))
    409 
    410     def test_map_exception(self):
    411         i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
    412         self.assertEqual(i.__next__(), (0, 1))
    413         self.assertEqual(i.__next__(), (0, 1))
    414         self.assertRaises(ZeroDivisionError, i.__next__)
    415 
    416     def test_map_timeout(self):
    417         results = []
    418         try:
    419             for i in self.executor.map(time.sleep,
    420                                        [0, 0, 6],
    421                                        timeout=5):
    422                 results.append(i)
    423         except futures.TimeoutError:
    424             pass
    425         else:
    426             self.fail('expected TimeoutError')
    427 
    428         self.assertEqual([None, None], results)
    429 
    430     def test_shutdown_race_issue12456(self):
    431         # Issue #12456: race condition at shutdown where trying to post a
    432         # sentinel in the call queue blocks (the queue is full while processes
    433         # have exited).
    434         self.executor.map(str, [2] * (self.worker_count + 1))
    435         self.executor.shutdown()
    436 
    437     @test.support.cpython_only
    438     def test_no_stale_references(self):
    439         # Issue #16284: check that the executors don't unnecessarily hang onto
    440         # references.
    441         my_object = MyObject()
    442         my_object_collected = threading.Event()
    443         my_object_callback = weakref.ref(
    444             my_object, lambda obj: my_object_collected.set())
    445         # Deliberately discarding the future.
    446         self.executor.submit(my_object.my_method)
    447         del my_object
    448 
    449         collected = my_object_collected.wait(timeout=5.0)
    450         self.assertTrue(collected,
    451                         "Stale reference not collected within timeout.")
    452 
    453     def test_max_workers_negative(self):
    454         for number in (0, -1):
    455             with self.assertRaisesRegex(ValueError,
    456                                         "max_workers must be greater "
    457                                         "than 0"):
    458                 self.executor_type(max_workers=number)
    459 
    460 
    461 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
    462     def test_map_submits_without_iteration(self):
    463         """Tests verifying issue 11777."""
    464         finished = []
    465         def record_finished(n):
    466             finished.append(n)
    467 
    468         self.executor.map(record_finished, range(10))
    469         self.executor.shutdown(wait=True)
    470         self.assertCountEqual(finished, range(10))
    471 
    472     def test_default_workers(self):
    473         executor = self.executor_type()
    474         self.assertEqual(executor._max_workers,
    475                          (os.cpu_count() or 1) * 5)
    476 
    477 
    478 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
    479     def test_killed_child(self):
    480         # When a child process is abruptly terminated, the whole pool gets
    481         # "broken".
    482         futures = [self.executor.submit(time.sleep, 3)]
    483         # Get one of the processes, and terminate (kill) it
    484         p = next(iter(self.executor._processes.values()))
    485         p.terminate()
    486         for fut in futures:
    487             self.assertRaises(BrokenProcessPool, fut.result)
    488         # Submitting other jobs fails as well.
    489         self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
    490 
    491     def test_map_chunksize(self):
    492         def bad_map():
    493             list(self.executor.map(pow, range(40), range(40), chunksize=-1))
    494 
    495         ref = list(map(pow, range(40), range(40)))
    496         self.assertEqual(
    497             list(self.executor.map(pow, range(40), range(40), chunksize=6)),
    498             ref)
    499         self.assertEqual(
    500             list(self.executor.map(pow, range(40), range(40), chunksize=50)),
    501             ref)
    502         self.assertEqual(
    503             list(self.executor.map(pow, range(40), range(40), chunksize=40)),
    504             ref)
    505         self.assertRaises(ValueError, bad_map)
    506 
    507     @classmethod
    508     def _test_traceback(cls):
    509         raise RuntimeError(123) # some comment
    510 
    511     def test_traceback(self):
    512         # We want ensure that the traceback from the child process is
    513         # contained in the traceback raised in the main process.
    514         future = self.executor.submit(self._test_traceback)
    515         with self.assertRaises(Exception) as cm:
    516             future.result()
    517 
    518         exc = cm.exception
    519         self.assertIs(type(exc), RuntimeError)
    520         self.assertEqual(exc.args, (123,))
    521         cause = exc.__cause__
    522         self.assertIs(type(cause), futures.process._RemoteTraceback)
    523         self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
    524 
    525         with test.support.captured_stderr() as f1:
    526             try:
    527                 raise exc
    528             except RuntimeError:
    529                 sys.excepthook(*sys.exc_info())
    530         self.assertIn('raise RuntimeError(123) # some comment',
    531                       f1.getvalue())
    532 
    533 
    534 class FutureTests(unittest.TestCase):
    535     def test_done_callback_with_result(self):
    536         callback_result = None
    537         def fn(callback_future):
    538             nonlocal callback_result
    539             callback_result = callback_future.result()
    540 
    541         f = Future()
    542         f.add_done_callback(fn)
    543         f.set_result(5)
    544         self.assertEqual(5, callback_result)
    545 
    546     def test_done_callback_with_exception(self):
    547         callback_exception = None
    548         def fn(callback_future):
    549             nonlocal callback_exception
    550             callback_exception = callback_future.exception()
    551 
    552         f = Future()
    553         f.add_done_callback(fn)
    554         f.set_exception(Exception('test'))
    555         self.assertEqual(('test',), callback_exception.args)
    556 
    557     def test_done_callback_with_cancel(self):
    558         was_cancelled = None
    559         def fn(callback_future):
    560             nonlocal was_cancelled
    561             was_cancelled = callback_future.cancelled()
    562 
    563         f = Future()
    564         f.add_done_callback(fn)
    565         self.assertTrue(f.cancel())
    566         self.assertTrue(was_cancelled)
    567 
    568     def test_done_callback_raises(self):
    569         with test.support.captured_stderr() as stderr:
    570             raising_was_called = False
    571             fn_was_called = False
    572 
    573             def raising_fn(callback_future):
    574                 nonlocal raising_was_called
    575                 raising_was_called = True
    576                 raise Exception('doh!')
    577 
    578             def fn(callback_future):
    579                 nonlocal fn_was_called
    580                 fn_was_called = True
    581 
    582             f = Future()
    583             f.add_done_callback(raising_fn)
    584             f.add_done_callback(fn)
    585             f.set_result(5)
    586             self.assertTrue(raising_was_called)
    587             self.assertTrue(fn_was_called)
    588             self.assertIn('Exception: doh!', stderr.getvalue())
    589 
    590     def test_done_callback_already_successful(self):
    591         callback_result = None
    592         def fn(callback_future):
    593             nonlocal callback_result
    594             callback_result = callback_future.result()
    595 
    596         f = Future()
    597         f.set_result(5)
    598         f.add_done_callback(fn)
    599         self.assertEqual(5, callback_result)
    600 
    601     def test_done_callback_already_failed(self):
    602         callback_exception = None
    603         def fn(callback_future):
    604             nonlocal callback_exception
    605             callback_exception = callback_future.exception()
    606 
    607         f = Future()
    608         f.set_exception(Exception('test'))
    609         f.add_done_callback(fn)
    610         self.assertEqual(('test',), callback_exception.args)
    611 
    612     def test_done_callback_already_cancelled(self):
    613         was_cancelled = None
    614         def fn(callback_future):
    615             nonlocal was_cancelled
    616             was_cancelled = callback_future.cancelled()
    617 
    618         f = Future()
    619         self.assertTrue(f.cancel())
    620         f.add_done_callback(fn)
    621         self.assertTrue(was_cancelled)
    622 
    623     def test_repr(self):
    624         self.assertRegex(repr(PENDING_FUTURE),
    625                          '<Future at 0x[0-9a-f]+ state=pending>')
    626         self.assertRegex(repr(RUNNING_FUTURE),
    627                          '<Future at 0x[0-9a-f]+ state=running>')
    628         self.assertRegex(repr(CANCELLED_FUTURE),
    629                          '<Future at 0x[0-9a-f]+ state=cancelled>')
    630         self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
    631                          '<Future at 0x[0-9a-f]+ state=cancelled>')
    632         self.assertRegex(
    633                 repr(EXCEPTION_FUTURE),
    634                 '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
    635         self.assertRegex(
    636                 repr(SUCCESSFUL_FUTURE),
    637                 '<Future at 0x[0-9a-f]+ state=finished returned int>')
    638 
    639 
    640     def test_cancel(self):
    641         f1 = create_future(state=PENDING)
    642         f2 = create_future(state=RUNNING)
    643         f3 = create_future(state=CANCELLED)
    644         f4 = create_future(state=CANCELLED_AND_NOTIFIED)
    645         f5 = create_future(state=FINISHED, exception=OSError())
    646         f6 = create_future(state=FINISHED, result=5)
    647 
    648         self.assertTrue(f1.cancel())
    649         self.assertEqual(f1._state, CANCELLED)
    650 
    651         self.assertFalse(f2.cancel())
    652         self.assertEqual(f2._state, RUNNING)
    653 
    654         self.assertTrue(f3.cancel())
    655         self.assertEqual(f3._state, CANCELLED)
    656 
    657         self.assertTrue(f4.cancel())
    658         self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
    659 
    660         self.assertFalse(f5.cancel())
    661         self.assertEqual(f5._state, FINISHED)
    662 
    663         self.assertFalse(f6.cancel())
    664         self.assertEqual(f6._state, FINISHED)
    665 
    666     def test_cancelled(self):
    667         self.assertFalse(PENDING_FUTURE.cancelled())
    668         self.assertFalse(RUNNING_FUTURE.cancelled())
    669         self.assertTrue(CANCELLED_FUTURE.cancelled())
    670         self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
    671         self.assertFalse(EXCEPTION_FUTURE.cancelled())
    672         self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
    673 
    674     def test_done(self):
    675         self.assertFalse(PENDING_FUTURE.done())
    676         self.assertFalse(RUNNING_FUTURE.done())
    677         self.assertTrue(CANCELLED_FUTURE.done())
    678         self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
    679         self.assertTrue(EXCEPTION_FUTURE.done())
    680         self.assertTrue(SUCCESSFUL_FUTURE.done())
    681 
    682     def test_running(self):
    683         self.assertFalse(PENDING_FUTURE.running())
    684         self.assertTrue(RUNNING_FUTURE.running())
    685         self.assertFalse(CANCELLED_FUTURE.running())
    686         self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
    687         self.assertFalse(EXCEPTION_FUTURE.running())
    688         self.assertFalse(SUCCESSFUL_FUTURE.running())
    689 
    690     def test_result_with_timeout(self):
    691         self.assertRaises(futures.TimeoutError,
    692                           PENDING_FUTURE.result, timeout=0)
    693         self.assertRaises(futures.TimeoutError,
    694                           RUNNING_FUTURE.result, timeout=0)
    695         self.assertRaises(futures.CancelledError,
    696                           CANCELLED_FUTURE.result, timeout=0)
    697         self.assertRaises(futures.CancelledError,
    698                           CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
    699         self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
    700         self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
    701 
    702     def test_result_with_success(self):
    703         # TODO(brian (at] sweetapp.com): This test is timing dependent.
    704         def notification():
    705             # Wait until the main thread is waiting for the result.
    706             time.sleep(1)
    707             f1.set_result(42)
    708 
    709         f1 = create_future(state=PENDING)
    710         t = threading.Thread(target=notification)
    711         t.start()
    712 
    713         self.assertEqual(f1.result(timeout=5), 42)
    714 
    715     def test_result_with_cancel(self):
    716         # TODO(brian (at] sweetapp.com): This test is timing dependent.
    717         def notification():
    718             # Wait until the main thread is waiting for the result.
    719             time.sleep(1)
    720             f1.cancel()
    721 
    722         f1 = create_future(state=PENDING)
    723         t = threading.Thread(target=notification)
    724         t.start()
    725 
    726         self.assertRaises(futures.CancelledError, f1.result, timeout=5)
    727 
    728     def test_exception_with_timeout(self):
    729         self.assertRaises(futures.TimeoutError,
    730                           PENDING_FUTURE.exception, timeout=0)
    731         self.assertRaises(futures.TimeoutError,
    732                           RUNNING_FUTURE.exception, timeout=0)
    733         self.assertRaises(futures.CancelledError,
    734                           CANCELLED_FUTURE.exception, timeout=0)
    735         self.assertRaises(futures.CancelledError,
    736                           CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
    737         self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
    738                                    OSError))
    739         self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
    740 
    741     def test_exception_with_success(self):
    742         def notification():
    743             # Wait until the main thread is waiting for the exception.
    744             time.sleep(1)
    745             with f1._condition:
    746                 f1._state = FINISHED
    747                 f1._exception = OSError()
    748                 f1._condition.notify_all()
    749 
    750         f1 = create_future(state=PENDING)
    751         t = threading.Thread(target=notification)
    752         t.start()
    753 
    754         self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
    755 
    756 @test.support.reap_threads
    757 def test_main():
    758     try:
    759         test.support.run_unittest(__name__)
    760     finally:
    761         test.support.reap_children()
    762 
    763 if __name__ == "__main__":
    764     test_main()
    765