Home | History | Annotate | Download | only in test
      1 from test.test_support import verbose, run_unittest, import_module
      2 
      3 #Skip these tests if either fcntl or termios is not available
      4 fcntl = import_module('fcntl')
      5 import_module('termios')
      6 
      7 import errno
      8 import pty
      9 import os
     10 import sys
     11 import select
     12 import signal
     13 import socket
     14 import unittest
     15 
     16 TEST_STRING_1 = "I wish to buy a fish license.\n"
     17 TEST_STRING_2 = "For my pet fish, Eric.\n"
     18 
     19 if verbose:
     20     def debug(msg):
     21         print msg
     22 else:
     23     def debug(msg):
     24         pass
     25 
     26 
     27 def normalize_output(data):
     28     # Some operating systems do conversions on newline.  We could possibly
     29     # fix that by doing the appropriate termios.tcsetattr()s.  I couldn't
     30     # figure out the right combo on Tru64 and I don't have an IRIX box.
     31     # So just normalize the output and doc the problem O/Ses by allowing
     32     # certain combinations for some platforms, but avoid allowing other
     33     # differences (like extra whitespace, trailing garbage, etc.)
     34 
     35     # This is about the best we can do without getting some feedback
     36     # from someone more knowledgable.
     37 
     38     # OSF/1 (Tru64) apparently turns \n into \r\r\n.
     39     if data.endswith('\r\r\n'):
     40         return data.replace('\r\r\n', '\n')
     41 
     42     # IRIX apparently turns \n into \r\n.
     43     if data.endswith('\r\n'):
     44         return data.replace('\r\n', '\n')
     45 
     46     return data
     47 
     48 
     49 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
     50 # because pty code is not too portable.
     51 # XXX(nnorwitz):  these tests leak fds when there is an error.
     52 class PtyTest(unittest.TestCase):
     53     def setUp(self):
     54         # isatty() and close() can hang on some platforms.  Set an alarm
     55         # before running the test to make sure we don't hang forever.
     56         self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
     57         signal.alarm(10)
     58 
     59     def tearDown(self):
     60         # remove alarm, restore old alarm handler
     61         signal.alarm(0)
     62         signal.signal(signal.SIGALRM, self.old_alarm)
     63 
     64     def handle_sig(self, sig, frame):
     65         self.fail("isatty hung")
     66 
     67     def test_basic(self):
     68         try:
     69             debug("Calling master_open()")
     70             master_fd, slave_name = pty.master_open()
     71             debug("Got master_fd '%d', slave_name '%s'" %
     72                   (master_fd, slave_name))
     73             debug("Calling slave_open(%r)" % (slave_name,))
     74             slave_fd = pty.slave_open(slave_name)
     75             debug("Got slave_fd '%d'" % slave_fd)
     76         except OSError:
     77             # " An optional feature could not be imported " ... ?
     78             raise unittest.SkipTest, "Pseudo-terminals (seemingly) not functional."
     79 
     80         self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
     81 
     82         # Solaris requires reading the fd before anything is returned.
     83         # My guess is that since we open and close the slave fd
     84         # in master_open(), we need to read the EOF.
     85 
     86         # Ensure the fd is non-blocking in case there's nothing to read.
     87         orig_flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
     88         fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags | os.O_NONBLOCK)
     89         try:
     90             s1 = os.read(master_fd, 1024)
     91             self.assertEqual('', s1)
     92         except OSError, e:
     93             if e.errno != errno.EAGAIN:
     94                 raise
     95         # Restore the original flags.
     96         fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags)
     97 
     98         debug("Writing to slave_fd")
     99         os.write(slave_fd, TEST_STRING_1)
    100         s1 = os.read(master_fd, 1024)
    101         self.assertEqual('I wish to buy a fish license.\n',
    102                          normalize_output(s1))
    103 
    104         debug("Writing chunked output")
    105         os.write(slave_fd, TEST_STRING_2[:5])
    106         os.write(slave_fd, TEST_STRING_2[5:])
    107         s2 = os.read(master_fd, 1024)
    108         self.assertEqual('For my pet fish, Eric.\n', normalize_output(s2))
    109 
    110         os.close(slave_fd)
    111         os.close(master_fd)
    112 
    113 
    114     def test_fork(self):
    115         debug("calling pty.fork()")
    116         pid, master_fd = pty.fork()
    117         if pid == pty.CHILD:
    118             # stdout should be connected to a tty.
    119             if not os.isatty(1):
    120                 debug("Child's fd 1 is not a tty?!")
    121                 os._exit(3)
    122 
    123             # After pty.fork(), the child should already be a session leader.
    124             # (on those systems that have that concept.)
    125             debug("In child, calling os.setsid()")
    126             try:
    127                 os.setsid()
    128             except OSError:
    129                 # Good, we already were session leader
    130                 debug("Good: OSError was raised.")
    131                 pass
    132             except AttributeError:
    133                 # Have pty, but not setsid()?
    134                 debug("No setsid() available?")
    135                 pass
    136             except:
    137                 # We don't want this error to propagate, escaping the call to
    138                 # os._exit() and causing very peculiar behavior in the calling
    139                 # regrtest.py !
    140                 # Note: could add traceback printing here.
    141                 debug("An unexpected error was raised.")
    142                 os._exit(1)
    143             else:
    144                 debug("os.setsid() succeeded! (bad!)")
    145                 os._exit(2)
    146             os._exit(4)
    147         else:
    148             debug("Waiting for child (%d) to finish." % pid)
    149             # In verbose mode, we have to consume the debug output from the
    150             # child or the child will block, causing this test to hang in the
    151             # parent's waitpid() call.  The child blocks after a
    152             # platform-dependent amount of data is written to its fd.  On
    153             # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
    154             # X even the small writes in the child above will block it.  Also
    155             # on Linux, the read() will raise an OSError (input/output error)
    156             # when it tries to read past the end of the buffer but the child's
    157             # already exited, so catch and discard those exceptions.  It's not
    158             # worth checking for EIO.
    159             while True:
    160                 try:
    161                     data = os.read(master_fd, 80)
    162                 except OSError:
    163                     break
    164                 if not data:
    165                     break
    166                 sys.stdout.write(data.replace('\r\n', '\n'))
    167 
    168             ##line = os.read(master_fd, 80)
    169             ##lines = line.replace('\r\n', '\n').split('\n')
    170             ##if False and lines != ['In child, calling os.setsid()',
    171             ##             'Good: OSError was raised.', '']:
    172             ##    raise TestFailed("Unexpected output from child: %r" % line)
    173 
    174             (pid, status) = os.waitpid(pid, 0)
    175             res = status >> 8
    176             debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
    177             if res == 1:
    178                 self.fail("Child raised an unexpected exception in os.setsid()")
    179             elif res == 2:
    180                 self.fail("pty.fork() failed to make child a session leader.")
    181             elif res == 3:
    182                 self.fail("Child spawned by pty.fork() did not have a tty as stdout")
    183             elif res != 4:
    184                 self.fail("pty.fork() failed for unknown reasons.")
    185 
    186             ##debug("Reading from master_fd now that the child has exited")
    187             ##try:
    188             ##    s1 = os.read(master_fd, 1024)
    189             ##except os.error:
    190             ##    pass
    191             ##else:
    192             ##    raise TestFailed("Read from master_fd did not raise exception")
    193 
    194         os.close(master_fd)
    195 
    196         # pty.fork() passed.
    197 
    198 
    199 class SmallPtyTests(unittest.TestCase):
    200     """These tests don't spawn children or hang."""
    201 
    202     def setUp(self):
    203         self.orig_stdin_fileno = pty.STDIN_FILENO
    204         self.orig_stdout_fileno = pty.STDOUT_FILENO
    205         self.orig_pty_select = pty.select
    206         self.fds = []  # A list of file descriptors to close.
    207         self.select_rfds_lengths = []
    208         self.select_rfds_results = []
    209 
    210     def tearDown(self):
    211         pty.STDIN_FILENO = self.orig_stdin_fileno
    212         pty.STDOUT_FILENO = self.orig_stdout_fileno
    213         pty.select = self.orig_pty_select
    214         for fd in self.fds:
    215             try:
    216                 os.close(fd)
    217             except:
    218                 pass
    219 
    220     def _pipe(self):
    221         pipe_fds = os.pipe()
    222         self.fds.extend(pipe_fds)
    223         return pipe_fds
    224 
    225     def _mock_select(self, rfds, wfds, xfds):
    226         # This will raise IndexError when no more expected calls exist.
    227         self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
    228         return self.select_rfds_results.pop(0), [], []
    229 
    230     def test__copy_to_each(self):
    231         """Test the normal data case on both master_fd and stdin."""
    232         read_from_stdout_fd, mock_stdout_fd = self._pipe()
    233         pty.STDOUT_FILENO = mock_stdout_fd
    234         mock_stdin_fd, write_to_stdin_fd = self._pipe()
    235         pty.STDIN_FILENO = mock_stdin_fd
    236         socketpair = socket.socketpair()
    237         masters = [s.fileno() for s in socketpair]
    238         self.fds.extend(masters)
    239 
    240         # Feed data.  Smaller than PIPEBUF.  These writes will not block.
    241         os.write(masters[1], b'from master')
    242         os.write(write_to_stdin_fd, b'from stdin')
    243 
    244         # Expect two select calls, the last one will cause IndexError
    245         pty.select = self._mock_select
    246         self.select_rfds_lengths.append(2)
    247         self.select_rfds_results.append([mock_stdin_fd, masters[0]])
    248         self.select_rfds_lengths.append(2)
    249 
    250         with self.assertRaises(IndexError):
    251             pty._copy(masters[0])
    252 
    253         # Test that the right data went to the right places.
    254         rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
    255         self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
    256         self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
    257         self.assertEqual(os.read(masters[1], 20), b'from stdin')
    258 
    259     def test__copy_eof_on_all(self):
    260         """Test the empty read EOF case on both master_fd and stdin."""
    261         read_from_stdout_fd, mock_stdout_fd = self._pipe()
    262         pty.STDOUT_FILENO = mock_stdout_fd
    263         mock_stdin_fd, write_to_stdin_fd = self._pipe()
    264         pty.STDIN_FILENO = mock_stdin_fd
    265         socketpair = socket.socketpair()
    266         masters = [s.fileno() for s in socketpair]
    267         self.fds.extend(masters)
    268 
    269         os.close(masters[1])
    270         socketpair[1].close()
    271         os.close(write_to_stdin_fd)
    272 
    273         # Expect two select calls, the last one will cause IndexError
    274         pty.select = self._mock_select
    275         self.select_rfds_lengths.append(2)
    276         self.select_rfds_results.append([mock_stdin_fd, masters[0]])
    277         # We expect that both fds were removed from the fds list as they
    278         # both encountered an EOF before the second select call.
    279         self.select_rfds_lengths.append(0)
    280 
    281         with self.assertRaises(IndexError):
    282             pty._copy(masters[0])
    283 
    284 
    285 def test_main(verbose=None):
    286     run_unittest(SmallPtyTests, PtyTest)
    287 
    288 if __name__ == "__main__":
    289     test_main()
    290