Home | History | Annotate | Download | only in test
      1 from test.support import verbose, import_module, reap_children
      2 
      3 # Skip these tests if termios is not available
      4 import_module('termios')
      5 
      6 import errno
      7 import pty
      8 import os
      9 import sys
     10 import select
     11 import signal
     12 import socket
     13 import io # readline
     14 import unittest
     15 
     16 TEST_STRING_1 = b"I wish to buy a fish license.\n"
     17 TEST_STRING_2 = b"For my pet fish, Eric.\n"
     18 
     19 if verbose:
     20     def debug(msg):
     21         print(msg)
     22 else:
     23     def debug(msg):
     24         pass
     25 
     26 
     27 # Note that os.read() is nondeterministic so we need to be very careful
     28 # to make the test suite deterministic.  A normal call to os.read() may
     29 # give us less than expected.
     30 #
     31 # Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get
     32 # back 'foo\r\n' at the other end.  The behavior depends on the termios
     33 # setting.  The newline translation may be OS-specific.  To make the
     34 # test suite deterministic and OS-independent, the functions _readline
     35 # and normalize_output can be used.
     36 
     37 def normalize_output(data):
     38     # Some operating systems do conversions on newline.  We could possibly fix
     39     # that by doing the appropriate termios.tcsetattr()s.  I couldn't figure out
     40     # the right combo on Tru64.  So, just normalize the output and doc the
     41     # problem O/Ses by allowing certain combinations for some platforms, but
     42     # avoid allowing other differences (like extra whitespace, trailing garbage,
     43     # etc.)
     44 
     45     # This is about the best we can do without getting some feedback
     46     # from someone more knowledgable.
     47 
     48     # OSF/1 (Tru64) apparently turns \n into \r\r\n.
     49     if data.endswith(b'\r\r\n'):
     50         return data.replace(b'\r\r\n', b'\n')
     51 
     52     if data.endswith(b'\r\n'):
     53         return data.replace(b'\r\n', b'\n')
     54 
     55     return data
     56 
     57 def _readline(fd):
     58     """Read one line.  May block forever if no newline is read."""
     59     reader = io.FileIO(fd, mode='rb', closefd=False)
     60     return reader.readline()
     61 
     62 
     63 
     64 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
     65 # because pty code is not too portable.
     66 # XXX(nnorwitz):  these tests leak fds when there is an error.
     67 class PtyTest(unittest.TestCase):
     68     def setUp(self):
     69         # isatty() and close() can hang on some platforms.  Set an alarm
     70         # before running the test to make sure we don't hang forever.
     71         old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
     72         self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
     73         self.addCleanup(signal.alarm, 0)
     74         signal.alarm(10)
     75 
     76     def handle_sig(self, sig, frame):
     77         self.fail("isatty hung")
     78 
     79     def test_basic(self):
     80         try:
     81             debug("Calling master_open()")
     82             master_fd, slave_name = pty.master_open()
     83             debug("Got master_fd '%d', slave_name '%s'" %
     84                   (master_fd, slave_name))
     85             debug("Calling slave_open(%r)" % (slave_name,))
     86             slave_fd = pty.slave_open(slave_name)
     87             debug("Got slave_fd '%d'" % slave_fd)
     88         except OSError:
     89             # " An optional feature could not be imported " ... ?
     90             raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
     91 
     92         self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
     93 
     94         # Solaris requires reading the fd before anything is returned.
     95         # My guess is that since we open and close the slave fd
     96         # in master_open(), we need to read the EOF.
     97 
     98         # Ensure the fd is non-blocking in case there's nothing to read.
     99         blocking = os.get_blocking(master_fd)
    100         try:
    101             os.set_blocking(master_fd, False)
    102             try:
    103                 s1 = os.read(master_fd, 1024)
    104                 self.assertEqual(b'', s1)
    105             except OSError as e:
    106                 if e.errno != errno.EAGAIN:
    107                     raise
    108         finally:
    109             # Restore the original flags.
    110             os.set_blocking(master_fd, blocking)
    111 
    112         debug("Writing to slave_fd")
    113         os.write(slave_fd, TEST_STRING_1)
    114         s1 = _readline(master_fd)
    115         self.assertEqual(b'I wish to buy a fish license.\n',
    116                          normalize_output(s1))
    117 
    118         debug("Writing chunked output")
    119         os.write(slave_fd, TEST_STRING_2[:5])
    120         os.write(slave_fd, TEST_STRING_2[5:])
    121         s2 = _readline(master_fd)
    122         self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
    123 
    124         os.close(slave_fd)
    125         os.close(master_fd)
    126 
    127 
    128     def test_fork(self):
    129         debug("calling pty.fork()")
    130         pid, master_fd = pty.fork()
    131         if pid == pty.CHILD:
    132             # stdout should be connected to a tty.
    133             if not os.isatty(1):
    134                 debug("Child's fd 1 is not a tty?!")
    135                 os._exit(3)
    136 
    137             # After pty.fork(), the child should already be a session leader.
    138             # (on those systems that have that concept.)
    139             debug("In child, calling os.setsid()")
    140             try:
    141                 os.setsid()
    142             except OSError:
    143                 # Good, we already were session leader
    144                 debug("Good: OSError was raised.")
    145                 pass
    146             except AttributeError:
    147                 # Have pty, but not setsid()?
    148                 debug("No setsid() available?")
    149                 pass
    150             except:
    151                 # We don't want this error to propagate, escaping the call to
    152                 # os._exit() and causing very peculiar behavior in the calling
    153                 # regrtest.py !
    154                 # Note: could add traceback printing here.
    155                 debug("An unexpected error was raised.")
    156                 os._exit(1)
    157             else:
    158                 debug("os.setsid() succeeded! (bad!)")
    159                 os._exit(2)
    160             os._exit(4)
    161         else:
    162             debug("Waiting for child (%d) to finish." % pid)
    163             # In verbose mode, we have to consume the debug output from the
    164             # child or the child will block, causing this test to hang in the
    165             # parent's waitpid() call.  The child blocks after a
    166             # platform-dependent amount of data is written to its fd.  On
    167             # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
    168             # X even the small writes in the child above will block it.  Also
    169             # on Linux, the read() will raise an OSError (input/output error)
    170             # when it tries to read past the end of the buffer but the child's
    171             # already exited, so catch and discard those exceptions.  It's not
    172             # worth checking for EIO.
    173             while True:
    174                 try:
    175                     data = os.read(master_fd, 80)
    176                 except OSError:
    177                     break
    178                 if not data:
    179                     break
    180                 sys.stdout.write(str(data.replace(b'\r\n', b'\n'),
    181                                      encoding='ascii'))
    182 
    183             ##line = os.read(master_fd, 80)
    184             ##lines = line.replace('\r\n', '\n').split('\n')
    185             ##if False and lines != ['In child, calling os.setsid()',
    186             ##             'Good: OSError was raised.', '']:
    187             ##    raise TestFailed("Unexpected output from child: %r" % line)
    188 
    189             (pid, status) = os.waitpid(pid, 0)
    190             res = status >> 8
    191             debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
    192             if res == 1:
    193                 self.fail("Child raised an unexpected exception in os.setsid()")
    194             elif res == 2:
    195                 self.fail("pty.fork() failed to make child a session leader.")
    196             elif res == 3:
    197                 self.fail("Child spawned by pty.fork() did not have a tty as stdout")
    198             elif res != 4:
    199                 self.fail("pty.fork() failed for unknown reasons.")
    200 
    201             ##debug("Reading from master_fd now that the child has exited")
    202             ##try:
    203             ##    s1 = os.read(master_fd, 1024)
    204             ##except OSError:
    205             ##    pass
    206             ##else:
    207             ##    raise TestFailed("Read from master_fd did not raise exception")
    208 
    209         os.close(master_fd)
    210 
    211         # pty.fork() passed.
    212 
    213 
    214 class SmallPtyTests(unittest.TestCase):
    215     """These tests don't spawn children or hang."""
    216 
    217     def setUp(self):
    218         self.orig_stdin_fileno = pty.STDIN_FILENO
    219         self.orig_stdout_fileno = pty.STDOUT_FILENO
    220         self.orig_pty_select = pty.select
    221         self.fds = []  # A list of file descriptors to close.
    222         self.files = []
    223         self.select_rfds_lengths = []
    224         self.select_rfds_results = []
    225 
    226     def tearDown(self):
    227         pty.STDIN_FILENO = self.orig_stdin_fileno
    228         pty.STDOUT_FILENO = self.orig_stdout_fileno
    229         pty.select = self.orig_pty_select
    230         for file in self.files:
    231             try:
    232                 file.close()
    233             except OSError:
    234                 pass
    235         for fd in self.fds:
    236             try:
    237                 os.close(fd)
    238             except OSError:
    239                 pass
    240 
    241     def _pipe(self):
    242         pipe_fds = os.pipe()
    243         self.fds.extend(pipe_fds)
    244         return pipe_fds
    245 
    246     def _socketpair(self):
    247         socketpair = socket.socketpair()
    248         self.files.extend(socketpair)
    249         return socketpair
    250 
    251     def _mock_select(self, rfds, wfds, xfds):
    252         # This will raise IndexError when no more expected calls exist.
    253         self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
    254         return self.select_rfds_results.pop(0), [], []
    255 
    256     def test__copy_to_each(self):
    257         """Test the normal data case on both master_fd and stdin."""
    258         read_from_stdout_fd, mock_stdout_fd = self._pipe()
    259         pty.STDOUT_FILENO = mock_stdout_fd
    260         mock_stdin_fd, write_to_stdin_fd = self._pipe()
    261         pty.STDIN_FILENO = mock_stdin_fd
    262         socketpair = self._socketpair()
    263         masters = [s.fileno() for s in socketpair]
    264 
    265         # Feed data.  Smaller than PIPEBUF.  These writes will not block.
    266         os.write(masters[1], b'from master')
    267         os.write(write_to_stdin_fd, b'from stdin')
    268 
    269         # Expect two select calls, the last one will cause IndexError
    270         pty.select = self._mock_select
    271         self.select_rfds_lengths.append(2)
    272         self.select_rfds_results.append([mock_stdin_fd, masters[0]])
    273         self.select_rfds_lengths.append(2)
    274 
    275         with self.assertRaises(IndexError):
    276             pty._copy(masters[0])
    277 
    278         # Test that the right data went to the right places.
    279         rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
    280         self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
    281         self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
    282         self.assertEqual(os.read(masters[1], 20), b'from stdin')
    283 
    284     def test__copy_eof_on_all(self):
    285         """Test the empty read EOF case on both master_fd and stdin."""
    286         read_from_stdout_fd, mock_stdout_fd = self._pipe()
    287         pty.STDOUT_FILENO = mock_stdout_fd
    288         mock_stdin_fd, write_to_stdin_fd = self._pipe()
    289         pty.STDIN_FILENO = mock_stdin_fd
    290         socketpair = self._socketpair()
    291         masters = [s.fileno() for s in socketpair]
    292 
    293         socketpair[1].close()
    294         os.close(write_to_stdin_fd)
    295 
    296         # Expect two select calls, the last one will cause IndexError
    297         pty.select = self._mock_select
    298         self.select_rfds_lengths.append(2)
    299         self.select_rfds_results.append([mock_stdin_fd, masters[0]])
    300         # We expect that both fds were removed from the fds list as they
    301         # both encountered an EOF before the second select call.
    302         self.select_rfds_lengths.append(0)
    303 
    304         with self.assertRaises(IndexError):
    305             pty._copy(masters[0])
    306 
    307 
    308 def tearDownModule():
    309     reap_children()
    310 
    311 if __name__ == "__main__":
    312     unittest.main()
    313