1 from test.test_support import verbose, run_unittest, import_module 2 3 #Skip these tests if either fcntl or termios is not available 4 fcntl = import_module('fcntl') 5 import_module('termios') 6 7 import errno 8 import pty 9 import os 10 import sys 11 import select 12 import signal 13 import socket 14 import unittest 15 16 TEST_STRING_1 = "I wish to buy a fish license.\n" 17 TEST_STRING_2 = "For my pet fish, Eric.\n" 18 19 if verbose: 20 def debug(msg): 21 print msg 22 else: 23 def debug(msg): 24 pass 25 26 27 def normalize_output(data): 28 # Some operating systems do conversions on newline. We could possibly 29 # fix that by doing the appropriate termios.tcsetattr()s. I couldn't 30 # figure out the right combo on Tru64 and I don't have an IRIX box. 31 # So just normalize the output and doc the problem O/Ses by allowing 32 # certain combinations for some platforms, but avoid allowing other 33 # differences (like extra whitespace, trailing garbage, etc.) 34 35 # This is about the best we can do without getting some feedback 36 # from someone more knowledgable. 37 38 # OSF/1 (Tru64) apparently turns \n into \r\r\n. 39 if data.endswith('\r\r\n'): 40 return data.replace('\r\r\n', '\n') 41 42 # IRIX apparently turns \n into \r\n. 43 if data.endswith('\r\n'): 44 return data.replace('\r\n', '\n') 45 46 return data 47 48 49 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing 50 # because pty code is not too portable. 51 # XXX(nnorwitz): these tests leak fds when there is an error. 52 class PtyTest(unittest.TestCase): 53 def setUp(self): 54 # isatty() and close() can hang on some platforms. Set an alarm 55 # before running the test to make sure we don't hang forever. 56 self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) 57 signal.alarm(10) 58 59 def tearDown(self): 60 # remove alarm, restore old alarm handler 61 signal.alarm(0) 62 signal.signal(signal.SIGALRM, self.old_alarm) 63 64 def handle_sig(self, sig, frame): 65 self.fail("isatty hung") 66 67 def test_basic(self): 68 try: 69 debug("Calling master_open()") 70 master_fd, slave_name = pty.master_open() 71 debug("Got master_fd '%d', slave_name '%s'" % 72 (master_fd, slave_name)) 73 debug("Calling slave_open(%r)" % (slave_name,)) 74 slave_fd = pty.slave_open(slave_name) 75 debug("Got slave_fd '%d'" % slave_fd) 76 except OSError: 77 # " An optional feature could not be imported " ... ? 78 raise unittest.SkipTest, "Pseudo-terminals (seemingly) not functional." 79 80 self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty') 81 82 # Solaris requires reading the fd before anything is returned. 83 # My guess is that since we open and close the slave fd 84 # in master_open(), we need to read the EOF. 85 86 # Ensure the fd is non-blocking in case there's nothing to read. 87 orig_flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) 88 fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags | os.O_NONBLOCK) 89 try: 90 s1 = os.read(master_fd, 1024) 91 self.assertEqual('', s1) 92 except OSError, e: 93 if e.errno != errno.EAGAIN: 94 raise 95 # Restore the original flags. 96 fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags) 97 98 debug("Writing to slave_fd") 99 os.write(slave_fd, TEST_STRING_1) 100 s1 = os.read(master_fd, 1024) 101 self.assertEqual('I wish to buy a fish license.\n', 102 normalize_output(s1)) 103 104 debug("Writing chunked output") 105 os.write(slave_fd, TEST_STRING_2[:5]) 106 os.write(slave_fd, TEST_STRING_2[5:]) 107 s2 = os.read(master_fd, 1024) 108 self.assertEqual('For my pet fish, Eric.\n', normalize_output(s2)) 109 110 os.close(slave_fd) 111 os.close(master_fd) 112 113 114 def test_fork(self): 115 debug("calling pty.fork()") 116 pid, master_fd = pty.fork() 117 if pid == pty.CHILD: 118 # stdout should be connected to a tty. 119 if not os.isatty(1): 120 debug("Child's fd 1 is not a tty?!") 121 os._exit(3) 122 123 # After pty.fork(), the child should already be a session leader. 124 # (on those systems that have that concept.) 125 debug("In child, calling os.setsid()") 126 try: 127 os.setsid() 128 except OSError: 129 # Good, we already were session leader 130 debug("Good: OSError was raised.") 131 pass 132 except AttributeError: 133 # Have pty, but not setsid()? 134 debug("No setsid() available?") 135 pass 136 except: 137 # We don't want this error to propagate, escaping the call to 138 # os._exit() and causing very peculiar behavior in the calling 139 # regrtest.py ! 140 # Note: could add traceback printing here. 141 debug("An unexpected error was raised.") 142 os._exit(1) 143 else: 144 debug("os.setsid() succeeded! (bad!)") 145 os._exit(2) 146 os._exit(4) 147 else: 148 debug("Waiting for child (%d) to finish." % pid) 149 # In verbose mode, we have to consume the debug output from the 150 # child or the child will block, causing this test to hang in the 151 # parent's waitpid() call. The child blocks after a 152 # platform-dependent amount of data is written to its fd. On 153 # Linux 2.6, it's 4000 bytes and the child won't block, but on OS 154 # X even the small writes in the child above will block it. Also 155 # on Linux, the read() will raise an OSError (input/output error) 156 # when it tries to read past the end of the buffer but the child's 157 # already exited, so catch and discard those exceptions. It's not 158 # worth checking for EIO. 159 while True: 160 try: 161 data = os.read(master_fd, 80) 162 except OSError: 163 break 164 if not data: 165 break 166 sys.stdout.write(data.replace('\r\n', '\n')) 167 168 ##line = os.read(master_fd, 80) 169 ##lines = line.replace('\r\n', '\n').split('\n') 170 ##if False and lines != ['In child, calling os.setsid()', 171 ## 'Good: OSError was raised.', '']: 172 ## raise TestFailed("Unexpected output from child: %r" % line) 173 174 (pid, status) = os.waitpid(pid, 0) 175 res = status >> 8 176 debug("Child (%d) exited with status %d (%d)." % (pid, res, status)) 177 if res == 1: 178 self.fail("Child raised an unexpected exception in os.setsid()") 179 elif res == 2: 180 self.fail("pty.fork() failed to make child a session leader.") 181 elif res == 3: 182 self.fail("Child spawned by pty.fork() did not have a tty as stdout") 183 elif res != 4: 184 self.fail("pty.fork() failed for unknown reasons.") 185 186 ##debug("Reading from master_fd now that the child has exited") 187 ##try: 188 ## s1 = os.read(master_fd, 1024) 189 ##except os.error: 190 ## pass 191 ##else: 192 ## raise TestFailed("Read from master_fd did not raise exception") 193 194 os.close(master_fd) 195 196 # pty.fork() passed. 197 198 199 class SmallPtyTests(unittest.TestCase): 200 """These tests don't spawn children or hang.""" 201 202 def setUp(self): 203 self.orig_stdin_fileno = pty.STDIN_FILENO 204 self.orig_stdout_fileno = pty.STDOUT_FILENO 205 self.orig_pty_select = pty.select 206 self.fds = [] # A list of file descriptors to close. 207 self.select_rfds_lengths = [] 208 self.select_rfds_results = [] 209 210 def tearDown(self): 211 pty.STDIN_FILENO = self.orig_stdin_fileno 212 pty.STDOUT_FILENO = self.orig_stdout_fileno 213 pty.select = self.orig_pty_select 214 for fd in self.fds: 215 try: 216 os.close(fd) 217 except: 218 pass 219 220 def _pipe(self): 221 pipe_fds = os.pipe() 222 self.fds.extend(pipe_fds) 223 return pipe_fds 224 225 def _mock_select(self, rfds, wfds, xfds): 226 # This will raise IndexError when no more expected calls exist. 227 self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) 228 return self.select_rfds_results.pop(0), [], [] 229 230 def test__copy_to_each(self): 231 """Test the normal data case on both master_fd and stdin.""" 232 read_from_stdout_fd, mock_stdout_fd = self._pipe() 233 pty.STDOUT_FILENO = mock_stdout_fd 234 mock_stdin_fd, write_to_stdin_fd = self._pipe() 235 pty.STDIN_FILENO = mock_stdin_fd 236 socketpair = socket.socketpair() 237 masters = [s.fileno() for s in socketpair] 238 self.fds.extend(masters) 239 240 # Feed data. Smaller than PIPEBUF. These writes will not block. 241 os.write(masters[1], b'from master') 242 os.write(write_to_stdin_fd, b'from stdin') 243 244 # Expect two select calls, the last one will cause IndexError 245 pty.select = self._mock_select 246 self.select_rfds_lengths.append(2) 247 self.select_rfds_results.append([mock_stdin_fd, masters[0]]) 248 self.select_rfds_lengths.append(2) 249 250 with self.assertRaises(IndexError): 251 pty._copy(masters[0]) 252 253 # Test that the right data went to the right places. 254 rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] 255 self.assertEqual([read_from_stdout_fd, masters[1]], rfds) 256 self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') 257 self.assertEqual(os.read(masters[1], 20), b'from stdin') 258 259 def test__copy_eof_on_all(self): 260 """Test the empty read EOF case on both master_fd and stdin.""" 261 read_from_stdout_fd, mock_stdout_fd = self._pipe() 262 pty.STDOUT_FILENO = mock_stdout_fd 263 mock_stdin_fd, write_to_stdin_fd = self._pipe() 264 pty.STDIN_FILENO = mock_stdin_fd 265 socketpair = socket.socketpair() 266 masters = [s.fileno() for s in socketpair] 267 self.fds.extend(masters) 268 269 os.close(masters[1]) 270 socketpair[1].close() 271 os.close(write_to_stdin_fd) 272 273 # Expect two select calls, the last one will cause IndexError 274 pty.select = self._mock_select 275 self.select_rfds_lengths.append(2) 276 self.select_rfds_results.append([mock_stdin_fd, masters[0]]) 277 # We expect that both fds were removed from the fds list as they 278 # both encountered an EOF before the second select call. 279 self.select_rfds_lengths.append(0) 280 281 with self.assertRaises(IndexError): 282 pty._copy(masters[0]) 283 284 285 def test_main(verbose=None): 286 run_unittest(SmallPtyTests, PtyTest) 287 288 if __name__ == "__main__": 289 test_main() 290