Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import support
      3 from contextlib import closing
      4 import enum
      5 import gc
      6 import pickle
      7 import select
      8 import signal
      9 import socket
     10 import struct
     11 import subprocess
     12 import traceback
     13 import sys, os, time, errno
     14 from test.support.script_helper import assert_python_ok, spawn_python
     15 try:
     16     import threading
     17 except ImportError:
     18     threading = None
     19 try:
     20     import _testcapi
     21 except ImportError:
     22     _testcapi = None
     23 
     24 
     25 class GenericTests(unittest.TestCase):
     26 
     27     @unittest.skipIf(threading is None, "test needs threading module")
     28     def test_enums(self):
     29         for name in dir(signal):
     30             sig = getattr(signal, name)
     31             if name in {'SIG_DFL', 'SIG_IGN'}:
     32                 self.assertIsInstance(sig, signal.Handlers)
     33             elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
     34                 self.assertIsInstance(sig, signal.Sigmasks)
     35             elif name.startswith('SIG') and not name.startswith('SIG_'):
     36                 self.assertIsInstance(sig, signal.Signals)
     37             elif name.startswith('CTRL_'):
     38                 self.assertIsInstance(sig, signal.Signals)
     39                 self.assertEqual(sys.platform, "win32")
     40 
     41 
     42 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     43 class PosixTests(unittest.TestCase):
     44     def trivial_signal_handler(self, *args):
     45         pass
     46 
     47     def test_out_of_range_signal_number_raises_error(self):
     48         self.assertRaises(ValueError, signal.getsignal, 4242)
     49 
     50         self.assertRaises(ValueError, signal.signal, 4242,
     51                           self.trivial_signal_handler)
     52 
     53     def test_setting_signal_handler_to_none_raises_error(self):
     54         self.assertRaises(TypeError, signal.signal,
     55                           signal.SIGUSR1, None)
     56 
     57     def test_getsignal(self):
     58         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
     59         self.assertIsInstance(hup, signal.Handlers)
     60         self.assertEqual(signal.getsignal(signal.SIGHUP),
     61                          self.trivial_signal_handler)
     62         signal.signal(signal.SIGHUP, hup)
     63         self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
     64 
     65     # Issue 3864, unknown if this affects earlier versions of freebsd also
     66     @unittest.skipIf(sys.platform=='freebsd6',
     67         'inter process signals not reliable (do not mix well with threading) '
     68         'on freebsd6')
     69     def test_interprocess_signal(self):
     70         dirname = os.path.dirname(__file__)
     71         script = os.path.join(dirname, 'signalinterproctester.py')
     72         assert_python_ok(script)
     73 
     74 
     75 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
     76 class WindowsSignalTests(unittest.TestCase):
     77     def test_issue9324(self):
     78         # Updated for issue #10003, adding SIGBREAK
     79         handler = lambda x, y: None
     80         checked = set()
     81         for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
     82                     signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
     83                     signal.SIGTERM):
     84             # Set and then reset a handler for signals that work on windows.
     85             # Issue #18396, only for signals without a C-level handler.
     86             if signal.getsignal(sig) is not None:
     87                 signal.signal(sig, signal.signal(sig, handler))
     88                 checked.add(sig)
     89         # Issue #18396: Ensure the above loop at least tested *something*
     90         self.assertTrue(checked)
     91 
     92         with self.assertRaises(ValueError):
     93             signal.signal(-1, handler)
     94 
     95         with self.assertRaises(ValueError):
     96             signal.signal(7, handler)
     97 
     98 
     99 class WakeupFDTests(unittest.TestCase):
    100 
    101     def test_invalid_fd(self):
    102         fd = support.make_bad_fd()
    103         self.assertRaises((ValueError, OSError),
    104                           signal.set_wakeup_fd, fd)
    105 
    106     def test_invalid_socket(self):
    107         sock = socket.socket()
    108         fd = sock.fileno()
    109         sock.close()
    110         self.assertRaises((ValueError, OSError),
    111                           signal.set_wakeup_fd, fd)
    112 
    113     def test_set_wakeup_fd_result(self):
    114         r1, w1 = os.pipe()
    115         self.addCleanup(os.close, r1)
    116         self.addCleanup(os.close, w1)
    117         r2, w2 = os.pipe()
    118         self.addCleanup(os.close, r2)
    119         self.addCleanup(os.close, w2)
    120 
    121         if hasattr(os, 'set_blocking'):
    122             os.set_blocking(w1, False)
    123             os.set_blocking(w2, False)
    124 
    125         signal.set_wakeup_fd(w1)
    126         self.assertEqual(signal.set_wakeup_fd(w2), w1)
    127         self.assertEqual(signal.set_wakeup_fd(-1), w2)
    128         self.assertEqual(signal.set_wakeup_fd(-1), -1)
    129 
    130     def test_set_wakeup_fd_socket_result(self):
    131         sock1 = socket.socket()
    132         self.addCleanup(sock1.close)
    133         sock1.setblocking(False)
    134         fd1 = sock1.fileno()
    135 
    136         sock2 = socket.socket()
    137         self.addCleanup(sock2.close)
    138         sock2.setblocking(False)
    139         fd2 = sock2.fileno()
    140 
    141         signal.set_wakeup_fd(fd1)
    142         self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
    143         self.assertEqual(signal.set_wakeup_fd(-1), fd2)
    144         self.assertEqual(signal.set_wakeup_fd(-1), -1)
    145 
    146     # On Windows, files are always blocking and Windows does not provide a
    147     # function to test if a socket is in non-blocking mode.
    148     @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
    149     def test_set_wakeup_fd_blocking(self):
    150         rfd, wfd = os.pipe()
    151         self.addCleanup(os.close, rfd)
    152         self.addCleanup(os.close, wfd)
    153 
    154         # fd must be non-blocking
    155         os.set_blocking(wfd, True)
    156         with self.assertRaises(ValueError) as cm:
    157             signal.set_wakeup_fd(wfd)
    158         self.assertEqual(str(cm.exception),
    159                          "the fd %s must be in non-blocking mode" % wfd)
    160 
    161         # non-blocking is ok
    162         os.set_blocking(wfd, False)
    163         signal.set_wakeup_fd(wfd)
    164         signal.set_wakeup_fd(-1)
    165 
    166 
    167 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    168 class WakeupSignalTests(unittest.TestCase):
    169     @unittest.skipIf(_testcapi is None, 'need _testcapi')
    170     def check_wakeup(self, test_body, *signals, ordered=True):
    171         # use a subprocess to have only one thread
    172         code = """if 1:
    173         import _testcapi
    174         import os
    175         import signal
    176         import struct
    177 
    178         signals = {!r}
    179 
    180         def handler(signum, frame):
    181             pass
    182 
    183         def check_signum(signals):
    184             data = os.read(read, len(signals)+1)
    185             raised = struct.unpack('%uB' % len(data), data)
    186             if not {!r}:
    187                 raised = set(raised)
    188                 signals = set(signals)
    189             if raised != signals:
    190                 raise Exception("%r != %r" % (raised, signals))
    191 
    192         {}
    193 
    194         signal.signal(signal.SIGALRM, handler)
    195         read, write = os.pipe()
    196         os.set_blocking(write, False)
    197         signal.set_wakeup_fd(write)
    198 
    199         test()
    200         check_signum(signals)
    201 
    202         os.close(read)
    203         os.close(write)
    204         """.format(tuple(map(int, signals)), ordered, test_body)
    205 
    206         assert_python_ok('-c', code)
    207 
    208     @unittest.skipIf(_testcapi is None, 'need _testcapi')
    209     def test_wakeup_write_error(self):
    210         # Issue #16105: write() errors in the C signal handler should not
    211         # pass silently.
    212         # Use a subprocess to have only one thread.
    213         code = """if 1:
    214         import _testcapi
    215         import errno
    216         import os
    217         import signal
    218         import sys
    219         from test.support import captured_stderr
    220 
    221         def handler(signum, frame):
    222             1/0
    223 
    224         signal.signal(signal.SIGALRM, handler)
    225         r, w = os.pipe()
    226         os.set_blocking(r, False)
    227 
    228         # Set wakeup_fd a read-only file descriptor to trigger the error
    229         signal.set_wakeup_fd(r)
    230         try:
    231             with captured_stderr() as err:
    232                 _testcapi.raise_signal(signal.SIGALRM)
    233         except ZeroDivisionError:
    234             # An ignored exception should have been printed out on stderr
    235             err = err.getvalue()
    236             if ('Exception ignored when trying to write to the signal wakeup fd'
    237                 not in err):
    238                 raise AssertionError(err)
    239             if ('OSError: [Errno %d]' % errno.EBADF) not in err:
    240                 raise AssertionError(err)
    241         else:
    242             raise AssertionError("ZeroDivisionError not raised")
    243 
    244         os.close(r)
    245         os.close(w)
    246         """
    247         r, w = os.pipe()
    248         try:
    249             os.write(r, b'x')
    250         except OSError:
    251             pass
    252         else:
    253             self.skipTest("OS doesn't report write() error on the read end of a pipe")
    254         finally:
    255             os.close(r)
    256             os.close(w)
    257 
    258         assert_python_ok('-c', code)
    259 
    260     def test_wakeup_fd_early(self):
    261         self.check_wakeup("""def test():
    262             import select
    263             import time
    264 
    265             TIMEOUT_FULL = 10
    266             TIMEOUT_HALF = 5
    267 
    268             class InterruptSelect(Exception):
    269                 pass
    270 
    271             def handler(signum, frame):
    272                 raise InterruptSelect
    273             signal.signal(signal.SIGALRM, handler)
    274 
    275             signal.alarm(1)
    276 
    277             # We attempt to get a signal during the sleep,
    278             # before select is called
    279             try:
    280                 select.select([], [], [], TIMEOUT_FULL)
    281             except InterruptSelect:
    282                 pass
    283             else:
    284                 raise Exception("select() was not interrupted")
    285 
    286             before_time = time.monotonic()
    287             select.select([read], [], [], TIMEOUT_FULL)
    288             after_time = time.monotonic()
    289             dt = after_time - before_time
    290             if dt >= TIMEOUT_HALF:
    291                 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
    292         """, signal.SIGALRM)
    293 
    294     def test_wakeup_fd_during(self):
    295         self.check_wakeup("""def test():
    296             import select
    297             import time
    298 
    299             TIMEOUT_FULL = 10
    300             TIMEOUT_HALF = 5
    301 
    302             class InterruptSelect(Exception):
    303                 pass
    304 
    305             def handler(signum, frame):
    306                 raise InterruptSelect
    307             signal.signal(signal.SIGALRM, handler)
    308 
    309             signal.alarm(1)
    310             before_time = time.monotonic()
    311             # We attempt to get a signal during the select call
    312             try:
    313                 select.select([read], [], [], TIMEOUT_FULL)
    314             except InterruptSelect:
    315                 pass
    316             else:
    317                 raise Exception("select() was not interrupted")
    318             after_time = time.monotonic()
    319             dt = after_time - before_time
    320             if dt >= TIMEOUT_HALF:
    321                 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
    322         """, signal.SIGALRM)
    323 
    324     def test_signum(self):
    325         self.check_wakeup("""def test():
    326             import _testcapi
    327             signal.signal(signal.SIGUSR1, handler)
    328             _testcapi.raise_signal(signal.SIGUSR1)
    329             _testcapi.raise_signal(signal.SIGALRM)
    330         """, signal.SIGUSR1, signal.SIGALRM)
    331 
    332     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    333                          'need signal.pthread_sigmask()')
    334     def test_pending(self):
    335         self.check_wakeup("""def test():
    336             signum1 = signal.SIGUSR1
    337             signum2 = signal.SIGUSR2
    338 
    339             signal.signal(signum1, handler)
    340             signal.signal(signum2, handler)
    341 
    342             signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
    343             _testcapi.raise_signal(signum1)
    344             _testcapi.raise_signal(signum2)
    345             # Unblocking the 2 signals calls the C signal handler twice
    346             signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
    347         """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
    348 
    349 
    350 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
    351 class WakeupSocketSignalTests(unittest.TestCase):
    352 
    353     @unittest.skipIf(_testcapi is None, 'need _testcapi')
    354     def test_socket(self):
    355         # use a subprocess to have only one thread
    356         code = """if 1:
    357         import signal
    358         import socket
    359         import struct
    360         import _testcapi
    361 
    362         signum = signal.SIGINT
    363         signals = (signum,)
    364 
    365         def handler(signum, frame):
    366             pass
    367 
    368         signal.signal(signum, handler)
    369 
    370         read, write = socket.socketpair()
    371         read.setblocking(False)
    372         write.setblocking(False)
    373         signal.set_wakeup_fd(write.fileno())
    374 
    375         _testcapi.raise_signal(signum)
    376 
    377         data = read.recv(1)
    378         if not data:
    379             raise Exception("no signum written")
    380         raised = struct.unpack('B', data)
    381         if raised != signals:
    382             raise Exception("%r != %r" % (raised, signals))
    383 
    384         read.close()
    385         write.close()
    386         """
    387 
    388         assert_python_ok('-c', code)
    389 
    390     @unittest.skipIf(_testcapi is None, 'need _testcapi')
    391     def test_send_error(self):
    392         # Use a subprocess to have only one thread.
    393         if os.name == 'nt':
    394             action = 'send'
    395         else:
    396             action = 'write'
    397         code = """if 1:
    398         import errno
    399         import signal
    400         import socket
    401         import sys
    402         import time
    403         import _testcapi
    404         from test.support import captured_stderr
    405 
    406         signum = signal.SIGINT
    407 
    408         def handler(signum, frame):
    409             pass
    410 
    411         signal.signal(signum, handler)
    412 
    413         read, write = socket.socketpair()
    414         read.setblocking(False)
    415         write.setblocking(False)
    416 
    417         signal.set_wakeup_fd(write.fileno())
    418 
    419         # Close sockets: send() will fail
    420         read.close()
    421         write.close()
    422 
    423         with captured_stderr() as err:
    424             _testcapi.raise_signal(signum)
    425 
    426         err = err.getvalue()
    427         if ('Exception ignored when trying to {action} to the signal wakeup fd'
    428             not in err):
    429             raise AssertionError(err)
    430         """.format(action=action)
    431         assert_python_ok('-c', code)
    432 
    433 
    434 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    435 class SiginterruptTest(unittest.TestCase):
    436 
    437     def readpipe_interrupted(self, interrupt):
    438         """Perform a read during which a signal will arrive.  Return True if the
    439         read is interrupted by the signal and raises an exception.  Return False
    440         if it returns normally.
    441         """
    442         # use a subprocess to have only one thread, to have a timeout on the
    443         # blocking read and to not touch signal handling in this process
    444         code = """if 1:
    445             import errno
    446             import os
    447             import signal
    448             import sys
    449 
    450             interrupt = %r
    451             r, w = os.pipe()
    452 
    453             def handler(signum, frame):
    454                 1 / 0
    455 
    456             signal.signal(signal.SIGALRM, handler)
    457             if interrupt is not None:
    458                 signal.siginterrupt(signal.SIGALRM, interrupt)
    459 
    460             print("ready")
    461             sys.stdout.flush()
    462 
    463             # run the test twice
    464             try:
    465                 for loop in range(2):
    466                     # send a SIGALRM in a second (during the read)
    467                     signal.alarm(1)
    468                     try:
    469                         # blocking call: read from a pipe without data
    470                         os.read(r, 1)
    471                     except ZeroDivisionError:
    472                         pass
    473                     else:
    474                         sys.exit(2)
    475                 sys.exit(3)
    476             finally:
    477                 os.close(r)
    478                 os.close(w)
    479         """ % (interrupt,)
    480         with spawn_python('-c', code) as process:
    481             try:
    482                 # wait until the child process is loaded and has started
    483                 first_line = process.stdout.readline()
    484 
    485                 stdout, stderr = process.communicate(timeout=5.0)
    486             except subprocess.TimeoutExpired:
    487                 process.kill()
    488                 return False
    489             else:
    490                 stdout = first_line + stdout
    491                 exitcode = process.wait()
    492                 if exitcode not in (2, 3):
    493                     raise Exception("Child error (exit code %s): %r"
    494                                     % (exitcode, stdout))
    495                 return (exitcode == 3)
    496 
    497     def test_without_siginterrupt(self):
    498         # If a signal handler is installed and siginterrupt is not called
    499         # at all, when that signal arrives, it interrupts a syscall that's in
    500         # progress.
    501         interrupted = self.readpipe_interrupted(None)
    502         self.assertTrue(interrupted)
    503 
    504     def test_siginterrupt_on(self):
    505         # If a signal handler is installed and siginterrupt is called with
    506         # a true value for the second argument, when that signal arrives, it
    507         # interrupts a syscall that's in progress.
    508         interrupted = self.readpipe_interrupted(True)
    509         self.assertTrue(interrupted)
    510 
    511     def test_siginterrupt_off(self):
    512         # If a signal handler is installed and siginterrupt is called with
    513         # a false value for the second argument, when that signal arrives, it
    514         # does not interrupt a syscall that's in progress.
    515         interrupted = self.readpipe_interrupted(False)
    516         self.assertFalse(interrupted)
    517 
    518 
    519 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    520 class ItimerTest(unittest.TestCase):
    521     def setUp(self):
    522         self.hndl_called = False
    523         self.hndl_count = 0
    524         self.itimer = None
    525         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
    526 
    527     def tearDown(self):
    528         signal.signal(signal.SIGALRM, self.old_alarm)
    529         if self.itimer is not None: # test_itimer_exc doesn't change this attr
    530             # just ensure that itimer is stopped
    531             signal.setitimer(self.itimer, 0)
    532 
    533     def sig_alrm(self, *args):
    534         self.hndl_called = True
    535 
    536     def sig_vtalrm(self, *args):
    537         self.hndl_called = True
    538 
    539         if self.hndl_count > 3:
    540             # it shouldn't be here, because it should have been disabled.
    541             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
    542                 "timer.")
    543         elif self.hndl_count == 3:
    544             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
    545             signal.setitimer(signal.ITIMER_VIRTUAL, 0)
    546 
    547         self.hndl_count += 1
    548 
    549     def sig_prof(self, *args):
    550         self.hndl_called = True
    551         signal.setitimer(signal.ITIMER_PROF, 0)
    552 
    553     def test_itimer_exc(self):
    554         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
    555         # defines it ?
    556         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
    557         # Negative times are treated as zero on some platforms.
    558         if 0:
    559             self.assertRaises(signal.ItimerError,
    560                               signal.setitimer, signal.ITIMER_REAL, -1)
    561 
    562     def test_itimer_real(self):
    563         self.itimer = signal.ITIMER_REAL
    564         signal.setitimer(self.itimer, 1.0)
    565         signal.pause()
    566         self.assertEqual(self.hndl_called, True)
    567 
    568     # Issue 3864, unknown if this affects earlier versions of freebsd also
    569     @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
    570         'itimer not reliable (does not mix well with threading) on some BSDs.')
    571     def test_itimer_virtual(self):
    572         self.itimer = signal.ITIMER_VIRTUAL
    573         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
    574         signal.setitimer(self.itimer, 0.3, 0.2)
    575 
    576         start_time = time.monotonic()
    577         while time.monotonic() - start_time < 60.0:
    578             # use up some virtual time by doing real work
    579             _ = pow(12345, 67890, 10000019)
    580             if signal.getitimer(self.itimer) == (0.0, 0.0):
    581                 break # sig_vtalrm handler stopped this itimer
    582         else: # Issue 8424
    583             self.skipTest("timeout: likely cause: machine too slow or load too "
    584                           "high")
    585 
    586         # virtual itimer should be (0.0, 0.0) now
    587         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    588         # and the handler should have been called
    589         self.assertEqual(self.hndl_called, True)
    590 
    591     # Issue 3864, unknown if this affects earlier versions of freebsd also
    592     @unittest.skipIf(sys.platform=='freebsd6',
    593         'itimer not reliable (does not mix well with threading) on freebsd6')
    594     def test_itimer_prof(self):
    595         self.itimer = signal.ITIMER_PROF
    596         signal.signal(signal.SIGPROF, self.sig_prof)
    597         signal.setitimer(self.itimer, 0.2, 0.2)
    598 
    599         start_time = time.monotonic()
    600         while time.monotonic() - start_time < 60.0:
    601             # do some work
    602             _ = pow(12345, 67890, 10000019)
    603             if signal.getitimer(self.itimer) == (0.0, 0.0):
    604                 break # sig_prof handler stopped this itimer
    605         else: # Issue 8424
    606             self.skipTest("timeout: likely cause: machine too slow or load too "
    607                           "high")
    608 
    609         # profiling itimer should be (0.0, 0.0) now
    610         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    611         # and the handler should have been called
    612         self.assertEqual(self.hndl_called, True)
    613 
    614 
    615 class PendingSignalsTests(unittest.TestCase):
    616     """
    617     Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    618     functions.
    619     """
    620     @unittest.skipUnless(hasattr(signal, 'sigpending'),
    621                          'need signal.sigpending()')
    622     def test_sigpending_empty(self):
    623         self.assertEqual(signal.sigpending(), set())
    624 
    625     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    626                          'need signal.pthread_sigmask()')
    627     @unittest.skipUnless(hasattr(signal, 'sigpending'),
    628                          'need signal.sigpending()')
    629     def test_sigpending(self):
    630         code = """if 1:
    631             import os
    632             import signal
    633 
    634             def handler(signum, frame):
    635                 1/0
    636 
    637             signum = signal.SIGUSR1
    638             signal.signal(signum, handler)
    639 
    640             signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    641             os.kill(os.getpid(), signum)
    642             pending = signal.sigpending()
    643             for sig in pending:
    644                 assert isinstance(sig, signal.Signals), repr(pending)
    645             if pending != {signum}:
    646                 raise Exception('%s != {%s}' % (pending, signum))
    647             try:
    648                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    649             except ZeroDivisionError:
    650                 pass
    651             else:
    652                 raise Exception("ZeroDivisionError not raised")
    653         """
    654         assert_python_ok('-c', code)
    655 
    656     @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
    657                          'need signal.pthread_kill()')
    658     def test_pthread_kill(self):
    659         code = """if 1:
    660             import signal
    661             import threading
    662             import sys
    663 
    664             signum = signal.SIGUSR1
    665 
    666             def handler(signum, frame):
    667                 1/0
    668 
    669             signal.signal(signum, handler)
    670 
    671             if sys.platform == 'freebsd6':
    672                 # Issue #12392 and #12469: send a signal to the main thread
    673                 # doesn't work before the creation of the first thread on
    674                 # FreeBSD 6
    675                 def noop():
    676                     pass
    677                 thread = threading.Thread(target=noop)
    678                 thread.start()
    679                 thread.join()
    680 
    681             tid = threading.get_ident()
    682             try:
    683                 signal.pthread_kill(tid, signum)
    684             except ZeroDivisionError:
    685                 pass
    686             else:
    687                 raise Exception("ZeroDivisionError not raised")
    688         """
    689         assert_python_ok('-c', code)
    690 
    691     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    692                          'need signal.pthread_sigmask()')
    693     def wait_helper(self, blocked, test):
    694         """
    695         test: body of the "def test(signum):" function.
    696         blocked: number of the blocked signal
    697         """
    698         code = '''if 1:
    699         import signal
    700         import sys
    701         from signal import Signals
    702 
    703         def handler(signum, frame):
    704             1/0
    705 
    706         %s
    707 
    708         blocked = %s
    709         signum = signal.SIGALRM
    710 
    711         # child: block and wait the signal
    712         try:
    713             signal.signal(signum, handler)
    714             signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
    715 
    716             # Do the tests
    717             test(signum)
    718 
    719             # The handler must not be called on unblock
    720             try:
    721                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
    722             except ZeroDivisionError:
    723                 print("the signal handler has been called",
    724                       file=sys.stderr)
    725                 sys.exit(1)
    726         except BaseException as err:
    727             print("error: {}".format(err), file=sys.stderr)
    728             sys.stderr.flush()
    729             sys.exit(1)
    730         ''' % (test.strip(), blocked)
    731 
    732         # sig*wait* must be called with the signal blocked: since the current
    733         # process might have several threads running, use a subprocess to have
    734         # a single thread.
    735         assert_python_ok('-c', code)
    736 
    737     @unittest.skipUnless(hasattr(signal, 'sigwait'),
    738                          'need signal.sigwait()')
    739     def test_sigwait(self):
    740         self.wait_helper(signal.SIGALRM, '''
    741         def test(signum):
    742             signal.alarm(1)
    743             received = signal.sigwait([signum])
    744             assert isinstance(received, signal.Signals), received
    745             if received != signum:
    746                 raise Exception('received %s, not %s' % (received, signum))
    747         ''')
    748 
    749     @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
    750                          'need signal.sigwaitinfo()')
    751     def test_sigwaitinfo(self):
    752         self.wait_helper(signal.SIGALRM, '''
    753         def test(signum):
    754             signal.alarm(1)
    755             info = signal.sigwaitinfo([signum])
    756             if info.si_signo != signum:
    757                 raise Exception("info.si_signo != %s" % signum)
    758         ''')
    759 
    760     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    761                          'need signal.sigtimedwait()')
    762     def test_sigtimedwait(self):
    763         self.wait_helper(signal.SIGALRM, '''
    764         def test(signum):
    765             signal.alarm(1)
    766             info = signal.sigtimedwait([signum], 10.1000)
    767             if info.si_signo != signum:
    768                 raise Exception('info.si_signo != %s' % signum)
    769         ''')
    770 
    771     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    772                          'need signal.sigtimedwait()')
    773     def test_sigtimedwait_poll(self):
    774         # check that polling with sigtimedwait works
    775         self.wait_helper(signal.SIGALRM, '''
    776         def test(signum):
    777             import os
    778             os.kill(os.getpid(), signum)
    779             info = signal.sigtimedwait([signum], 0)
    780             if info.si_signo != signum:
    781                 raise Exception('info.si_signo != %s' % signum)
    782         ''')
    783 
    784     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    785                          'need signal.sigtimedwait()')
    786     def test_sigtimedwait_timeout(self):
    787         self.wait_helper(signal.SIGALRM, '''
    788         def test(signum):
    789             received = signal.sigtimedwait([signum], 1.0)
    790             if received is not None:
    791                 raise Exception("received=%r" % (received,))
    792         ''')
    793 
    794     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
    795                          'need signal.sigtimedwait()')
    796     def test_sigtimedwait_negative_timeout(self):
    797         signum = signal.SIGALRM
    798         self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
    799 
    800     @unittest.skipUnless(hasattr(signal, 'sigwait'),
    801                          'need signal.sigwait()')
    802     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    803                          'need signal.pthread_sigmask()')
    804     @unittest.skipIf(threading is None, "test needs threading module")
    805     def test_sigwait_thread(self):
    806         # Check that calling sigwait() from a thread doesn't suspend the whole
    807         # process. A new interpreter is spawned to avoid problems when mixing
    808         # threads and fork(): only async-safe functions are allowed between
    809         # fork() and exec().
    810         assert_python_ok("-c", """if True:
    811             import os, threading, sys, time, signal
    812 
    813             # the default handler terminates the process
    814             signum = signal.SIGUSR1
    815 
    816             def kill_later():
    817                 # wait until the main thread is waiting in sigwait()
    818                 time.sleep(1)
    819                 os.kill(os.getpid(), signum)
    820 
    821             # the signal must be blocked by all the threads
    822             signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    823             killer = threading.Thread(target=kill_later)
    824             killer.start()
    825             received = signal.sigwait([signum])
    826             if received != signum:
    827                 print("sigwait() received %s, not %s" % (received, signum),
    828                       file=sys.stderr)
    829                 sys.exit(1)
    830             killer.join()
    831             # unblock the signal, which should have been cleared by sigwait()
    832             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    833         """)
    834 
    835     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    836                          'need signal.pthread_sigmask()')
    837     def test_pthread_sigmask_arguments(self):
    838         self.assertRaises(TypeError, signal.pthread_sigmask)
    839         self.assertRaises(TypeError, signal.pthread_sigmask, 1)
    840         self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
    841         self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
    842 
    843     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
    844                          'need signal.pthread_sigmask()')
    845     def test_pthread_sigmask(self):
    846         code = """if 1:
    847         import signal
    848         import os; import threading
    849 
    850         def handler(signum, frame):
    851             1/0
    852 
    853         def kill(signum):
    854             os.kill(os.getpid(), signum)
    855 
    856         def check_mask(mask):
    857             for sig in mask:
    858                 assert isinstance(sig, signal.Signals), repr(sig)
    859 
    860         def read_sigmask():
    861             sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
    862             check_mask(sigmask)
    863             return sigmask
    864 
    865         signum = signal.SIGUSR1
    866 
    867         # Install our signal handler
    868         old_handler = signal.signal(signum, handler)
    869 
    870         # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
    871         old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    872         check_mask(old_mask)
    873         try:
    874             kill(signum)
    875         except ZeroDivisionError:
    876             pass
    877         else:
    878             raise Exception("ZeroDivisionError not raised")
    879 
    880         # Block and then raise SIGUSR1. The signal is blocked: the signal
    881         # handler is not called, and the signal is now pending
    882         mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
    883         check_mask(mask)
    884         kill(signum)
    885 
    886         # Check the new mask
    887         blocked = read_sigmask()
    888         check_mask(blocked)
    889         if signum not in blocked:
    890             raise Exception("%s not in %s" % (signum, blocked))
    891         if old_mask ^ blocked != {signum}:
    892             raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
    893 
    894         # Unblock SIGUSR1
    895         try:
    896             # unblock the pending signal calls immediately the signal handler
    897             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
    898         except ZeroDivisionError:
    899             pass
    900         else:
    901             raise Exception("ZeroDivisionError not raised")
    902         try:
    903             kill(signum)
    904         except ZeroDivisionError:
    905             pass
    906         else:
    907             raise Exception("ZeroDivisionError not raised")
    908 
    909         # Check the new mask
    910         unblocked = read_sigmask()
    911         if signum in unblocked:
    912             raise Exception("%s in %s" % (signum, unblocked))
    913         if blocked ^ unblocked != {signum}:
    914             raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
    915         if old_mask != unblocked:
    916             raise Exception("%s != %s" % (old_mask, unblocked))
    917         """
    918         assert_python_ok('-c', code)
    919 
    920     @unittest.skipIf(sys.platform == 'freebsd6',
    921         "issue #12392: send a signal to the main thread doesn't work "
    922         "before the creation of the first thread on FreeBSD 6")
    923     @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
    924                          'need signal.pthread_kill()')
    925     def test_pthread_kill_main_thread(self):
    926         # Test that a signal can be sent to the main thread with pthread_kill()
    927         # before any other thread has been created (see issue #12392).
    928         code = """if True:
    929             import threading
    930             import signal
    931             import sys
    932 
    933             def handler(signum, frame):
    934                 sys.exit(3)
    935 
    936             signal.signal(signal.SIGUSR1, handler)
    937             signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
    938             sys.exit(2)
    939         """
    940 
    941         with spawn_python('-c', code) as process:
    942             stdout, stderr = process.communicate()
    943             exitcode = process.wait()
    944             if exitcode != 3:
    945                 raise Exception("Child error (exit code %s): %s" %
    946                                 (exitcode, stdout))
    947 
    948 
    949 def tearDownModule():
    950     support.reap_children()
    951 
    952 if __name__ == "__main__":
    953     unittest.main()
    954