Home | History | Annotate | Download | only in test
      1 # Test case for the os.poll() function
      2 
      3 import os
      4 import random
      5 import select
      6 try:
      7     import threading
      8 except ImportError:
      9     threading = None
     10 import time
     11 import unittest
     12 from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
     13 
     14 try:
     15     select.poll
     16 except AttributeError:
     17     raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
     18 
     19 
     20 def find_ready_matching(ready, flag):
     21     match = []
     22     for fd, mode in ready:
     23         if mode & flag:
     24             match.append(fd)
     25     return match
     26 
     27 class PollTests(unittest.TestCase):
     28 
     29     def test_poll1(self):
     30         # Basic functional test of poll object
     31         # Create a bunch of pipe and test that poll works with them.
     32 
     33         p = select.poll()
     34 
     35         NUM_PIPES = 12
     36         MSG = " This is a test."
     37         MSG_LEN = len(MSG)
     38         readers = []
     39         writers = []
     40         r2w = {}
     41         w2r = {}
     42 
     43         for i in range(NUM_PIPES):
     44             rd, wr = os.pipe()
     45             p.register(rd)
     46             p.modify(rd, select.POLLIN)
     47             p.register(wr, select.POLLOUT)
     48             readers.append(rd)
     49             writers.append(wr)
     50             r2w[rd] = wr
     51             w2r[wr] = rd
     52 
     53         bufs = []
     54 
     55         while writers:
     56             ready = p.poll()
     57             ready_writers = find_ready_matching(ready, select.POLLOUT)
     58             if not ready_writers:
     59                 raise RuntimeError, "no pipes ready for writing"
     60             wr = random.choice(ready_writers)
     61             os.write(wr, MSG)
     62 
     63             ready = p.poll()
     64             ready_readers = find_ready_matching(ready, select.POLLIN)
     65             if not ready_readers:
     66                 raise RuntimeError, "no pipes ready for reading"
     67             rd = random.choice(ready_readers)
     68             buf = os.read(rd, MSG_LEN)
     69             self.assertEqual(len(buf), MSG_LEN)
     70             bufs.append(buf)
     71             os.close(r2w[rd]) ; os.close( rd )
     72             p.unregister( r2w[rd] )
     73             p.unregister( rd )
     74             writers.remove(r2w[rd])
     75 
     76         self.assertEqual(bufs, [MSG] * NUM_PIPES)
     77 
     78     def poll_unit_tests(self):
     79         # returns NVAL for invalid file descriptor
     80         FD = 42
     81         try:
     82             os.close(FD)
     83         except OSError:
     84             pass
     85         p = select.poll()
     86         p.register(FD)
     87         r = p.poll()
     88         self.assertEqual(r[0], (FD, select.POLLNVAL))
     89 
     90         f = open(TESTFN, 'w')
     91         fd = f.fileno()
     92         p = select.poll()
     93         p.register(f)
     94         r = p.poll()
     95         self.assertEqual(r[0][0], fd)
     96         f.close()
     97         r = p.poll()
     98         self.assertEqual(r[0], (fd, select.POLLNVAL))
     99         os.unlink(TESTFN)
    100 
    101         # type error for invalid arguments
    102         p = select.poll()
    103         self.assertRaises(TypeError, p.register, p)
    104         self.assertRaises(TypeError, p.unregister, p)
    105 
    106         # can't unregister non-existent object
    107         p = select.poll()
    108         self.assertRaises(KeyError, p.unregister, 3)
    109 
    110         # Test error cases
    111         pollster = select.poll()
    112         class Nope:
    113             pass
    114 
    115         class Almost:
    116             def fileno(self):
    117                 return 'fileno'
    118 
    119         self.assertRaises(TypeError, pollster.register, Nope(), 0)
    120         self.assertRaises(TypeError, pollster.register, Almost(), 0)
    121 
    122     # Another test case for poll().  This is copied from the test case for
    123     # select(), modified to use poll() instead.
    124 
    125     def test_poll2(self):
    126         cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
    127         p = os.popen(cmd, 'r')
    128         pollster = select.poll()
    129         pollster.register( p, select.POLLIN )
    130         for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
    131             fdlist = pollster.poll(tout)
    132             if (fdlist == []):
    133                 continue
    134             fd, flags = fdlist[0]
    135             if flags & select.POLLHUP:
    136                 line = p.readline()
    137                 if line != "":
    138                     self.fail('error: pipe seems to be closed, but still returns data')
    139                 continue
    140 
    141             elif flags & select.POLLIN:
    142                 line = p.readline()
    143                 if not line:
    144                     break
    145                 continue
    146             else:
    147                 self.fail('Unexpected return value from select.poll: %s' % fdlist)
    148         p.close()
    149 
    150     def test_poll3(self):
    151         # test int overflow
    152         pollster = select.poll()
    153         pollster.register(1)
    154 
    155         self.assertRaises(OverflowError, pollster.poll, 1L << 64)
    156 
    157         x = 2 + 3
    158         if x != 5:
    159             self.fail('Overflow must have occurred')
    160 
    161         # Issues #15989, #17919
    162         self.assertRaises(OverflowError, pollster.register, 0, -1)
    163         self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
    164         self.assertRaises(OverflowError, pollster.modify, 1, -1)
    165         self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
    166 
    167     @cpython_only
    168     def test_poll_c_limits(self):
    169         from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
    170         pollster = select.poll()
    171         pollster.register(1)
    172 
    173         # Issues #15989, #17919
    174         self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
    175         self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
    176         self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
    177         self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
    178 
    179     @unittest.skipUnless(threading, 'Threading required for this test.')
    180     @reap_threads
    181     def test_threaded_poll(self):
    182         r, w = os.pipe()
    183         self.addCleanup(os.close, r)
    184         self.addCleanup(os.close, w)
    185         rfds = []
    186         for i in range(10):
    187             fd = os.dup(r)
    188             self.addCleanup(os.close, fd)
    189             rfds.append(fd)
    190         pollster = select.poll()
    191         for fd in rfds:
    192             pollster.register(fd, select.POLLIN)
    193 
    194         t = threading.Thread(target=pollster.poll)
    195         t.start()
    196         try:
    197             time.sleep(0.5)
    198             # trigger ufds array reallocation
    199             for fd in rfds:
    200                 pollster.unregister(fd)
    201             pollster.register(w, select.POLLOUT)
    202             self.assertRaises(RuntimeError, pollster.poll)
    203         finally:
    204             # and make the call to poll() from the thread return
    205             os.write(w, b'spam')
    206             t.join()
    207 
    208 
    209 def test_main():
    210     run_unittest(PollTests)
    211 
    212 if __name__ == '__main__':
    213     test_main()
    214