Home | History | Annotate | Download | only in test
      1 # Test case for the os.poll() function
      2 
      3 import os, select, random, unittest
      4 import _testcapi
      5 from test.test_support import TESTFN, run_unittest
      6 
      7 try:
      8     select.poll
      9 except AttributeError:
     10     raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
     11 
     12 
     13 def find_ready_matching(ready, flag):
     14     match = []
     15     for fd, mode in ready:
     16         if mode & flag:
     17             match.append(fd)
     18     return match
     19 
     20 class PollTests(unittest.TestCase):
     21 
     22     def test_poll1(self):
     23         # Basic functional test of poll object
     24         # Create a bunch of pipe and test that poll works with them.
     25 
     26         p = select.poll()
     27 
     28         NUM_PIPES = 12
     29         MSG = " This is a test."
     30         MSG_LEN = len(MSG)
     31         readers = []
     32         writers = []
     33         r2w = {}
     34         w2r = {}
     35 
     36         for i in range(NUM_PIPES):
     37             rd, wr = os.pipe()
     38             p.register(rd)
     39             p.modify(rd, select.POLLIN)
     40             p.register(wr, select.POLLOUT)
     41             readers.append(rd)
     42             writers.append(wr)
     43             r2w[rd] = wr
     44             w2r[wr] = rd
     45 
     46         bufs = []
     47 
     48         while writers:
     49             ready = p.poll()
     50             ready_writers = find_ready_matching(ready, select.POLLOUT)
     51             if not ready_writers:
     52                 raise RuntimeError, "no pipes ready for writing"
     53             wr = random.choice(ready_writers)
     54             os.write(wr, MSG)
     55 
     56             ready = p.poll()
     57             ready_readers = find_ready_matching(ready, select.POLLIN)
     58             if not ready_readers:
     59                 raise RuntimeError, "no pipes ready for reading"
     60             rd = random.choice(ready_readers)
     61             buf = os.read(rd, MSG_LEN)
     62             self.assertEqual(len(buf), MSG_LEN)
     63             bufs.append(buf)
     64             os.close(r2w[rd]) ; os.close( rd )
     65             p.unregister( r2w[rd] )
     66             p.unregister( rd )
     67             writers.remove(r2w[rd])
     68 
     69         self.assertEqual(bufs, [MSG] * NUM_PIPES)
     70 
     71     def poll_unit_tests(self):
     72         # returns NVAL for invalid file descriptor
     73         FD = 42
     74         try:
     75             os.close(FD)
     76         except OSError:
     77             pass
     78         p = select.poll()
     79         p.register(FD)
     80         r = p.poll()
     81         self.assertEqual(r[0], (FD, select.POLLNVAL))
     82 
     83         f = open(TESTFN, 'w')
     84         fd = f.fileno()
     85         p = select.poll()
     86         p.register(f)
     87         r = p.poll()
     88         self.assertEqual(r[0][0], fd)
     89         f.close()
     90         r = p.poll()
     91         self.assertEqual(r[0], (fd, select.POLLNVAL))
     92         os.unlink(TESTFN)
     93 
     94         # type error for invalid arguments
     95         p = select.poll()
     96         self.assertRaises(TypeError, p.register, p)
     97         self.assertRaises(TypeError, p.unregister, p)
     98 
     99         # can't unregister non-existent object
    100         p = select.poll()
    101         self.assertRaises(KeyError, p.unregister, 3)
    102 
    103         # Test error cases
    104         pollster = select.poll()
    105         class Nope:
    106             pass
    107 
    108         class Almost:
    109             def fileno(self):
    110                 return 'fileno'
    111 
    112         self.assertRaises(TypeError, pollster.register, Nope(), 0)
    113         self.assertRaises(TypeError, pollster.register, Almost(), 0)
    114 
    115     # Another test case for poll().  This is copied from the test case for
    116     # select(), modified to use poll() instead.
    117 
    118     def test_poll2(self):
    119         cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
    120         p = os.popen(cmd, 'r')
    121         pollster = select.poll()
    122         pollster.register( p, select.POLLIN )
    123         for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
    124             fdlist = pollster.poll(tout)
    125             if (fdlist == []):
    126                 continue
    127             fd, flags = fdlist[0]
    128             if flags & select.POLLHUP:
    129                 line = p.readline()
    130                 if line != "":
    131                     self.fail('error: pipe seems to be closed, but still returns data')
    132                 continue
    133 
    134             elif flags & select.POLLIN:
    135                 line = p.readline()
    136                 if not line:
    137                     break
    138                 continue
    139             else:
    140                 self.fail('Unexpected return value from select.poll: %s' % fdlist)
    141         p.close()
    142 
    143     def test_poll3(self):
    144         # test int overflow
    145         pollster = select.poll()
    146         pollster.register(1)
    147 
    148         self.assertRaises(OverflowError, pollster.poll, 1L << 64)
    149 
    150         x = 2 + 3
    151         if x != 5:
    152             self.fail('Overflow must have occurred')
    153 
    154         pollster = select.poll()
    155         # Issue 15989
    156         self.assertRaises(OverflowError, pollster.register, 0,
    157                           _testcapi.SHRT_MAX + 1)
    158         self.assertRaises(OverflowError, pollster.register, 0,
    159                           _testcapi.USHRT_MAX + 1)
    160         self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
    161         self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
    162 
    163 def test_main():
    164     run_unittest(PollTests)
    165 
    166 if __name__ == '__main__':
    167     test_main()
    168