Home | History | Annotate | Download | only in test
      1 import base64
      2 import datetime
      3 import sys
      4 import time
      5 import unittest
      6 import xmlrpclib
      7 import SimpleXMLRPCServer
      8 import mimetools
      9 import httplib
     10 import socket
     11 import StringIO
     12 import os
     13 import re
     14 from test import test_support
     15 
     16 try:
     17     import threading
     18 except ImportError:
     19     threading = None
     20 
     21 try:
     22     import gzip
     23 except ImportError:
     24     gzip = None
     25 
     26 alist = [{'astring': 'foo (at] bar.baz.spam',
     27           'afloat': 7283.43,
     28           'anint': 2**20,
     29           'ashortlong': 2L,
     30           'anotherlist': ['.zyx.41'],
     31           'abase64': xmlrpclib.Binary("my dog has fleas"),
     32           'boolean': xmlrpclib.False,
     33           'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
     34           'datetime2': xmlrpclib.DateTime(
     35                         (2005, 02, 10, 11, 41, 23, 0, 1, -1)),
     36           'datetime3': xmlrpclib.DateTime(
     37                         datetime.datetime(2005, 02, 10, 11, 41, 23)),
     38           }]
     39 
     40 if test_support.have_unicode:
     41     alist[0].update({
     42           'unicode': test_support.u(r'\u4000\u6000\u8000'),
     43           test_support.u(r'ukey\u4000'): 'regular value',
     44     })
     45 
     46 class XMLRPCTestCase(unittest.TestCase):
     47 
     48     def test_dump_load(self):
     49         self.assertEqual(alist,
     50                          xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])
     51 
     52     def test_dump_bare_datetime(self):
     53         # This checks that an unwrapped datetime.date object can be handled
     54         # by the marshalling code.  This can't be done via test_dump_load()
     55         # since with use_datetime set to 1 the unmarshaller would create
     56         # datetime objects for the 'datetime[123]' keys as well
     57         dt = datetime.datetime(2005, 02, 10, 11, 41, 23)
     58         s = xmlrpclib.dumps((dt,))
     59         (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
     60         self.assertEqual(newdt, dt)
     61         self.assertEqual(m, None)
     62 
     63         (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
     64         self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
     65 
     66     def test_datetime_before_1900(self):
     67         # same as before but with a date before 1900
     68         dt = datetime.datetime(1, 02, 10, 11, 41, 23)
     69         s = xmlrpclib.dumps((dt,))
     70         (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
     71         self.assertEqual(newdt, dt)
     72         self.assertEqual(m, None)
     73 
     74         (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
     75         self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
     76 
     77     def test_cmp_datetime_DateTime(self):
     78         now = datetime.datetime.now()
     79         dt = xmlrpclib.DateTime(now.timetuple())
     80         self.assertTrue(dt == now)
     81         self.assertTrue(now == dt)
     82         then = now + datetime.timedelta(seconds=4)
     83         self.assertTrue(then >= dt)
     84         self.assertTrue(dt < then)
     85 
     86     def test_bug_1164912 (self):
     87         d = xmlrpclib.DateTime()
     88         ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
     89                                             methodresponse=True))
     90         self.assertIsInstance(new_d.value, str)
     91 
     92         # Check that the output of dumps() is still an 8-bit string
     93         s = xmlrpclib.dumps((new_d,), methodresponse=True)
     94         self.assertIsInstance(s, str)
     95 
     96     def test_newstyle_class(self):
     97         class T(object):
     98             pass
     99         t = T()
    100         t.x = 100
    101         t.y = "Hello"
    102         ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
    103         self.assertEqual(t2, t.__dict__)
    104 
    105     def test_dump_big_long(self):
    106         self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,))
    107 
    108     def test_dump_bad_dict(self):
    109         self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
    110 
    111     def test_dump_recursive_seq(self):
    112         l = [1,2,3]
    113         t = [3,4,5,l]
    114         l.append(t)
    115         self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
    116 
    117     def test_dump_recursive_dict(self):
    118         d = {'1':1, '2':1}
    119         t = {'3':3, 'd':d}
    120         d['t'] = t
    121         self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
    122 
    123     def test_dump_big_int(self):
    124         if sys.maxint > 2L**31-1:
    125             self.assertRaises(OverflowError, xmlrpclib.dumps,
    126                               (int(2L**34),))
    127 
    128         xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
    129         self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))
    130         self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))
    131 
    132         def dummy_write(s):
    133             pass
    134 
    135         m = xmlrpclib.Marshaller()
    136         m.dump_int(xmlrpclib.MAXINT, dummy_write)
    137         m.dump_int(xmlrpclib.MININT, dummy_write)
    138         self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)
    139         self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)
    140 
    141 
    142     def test_dump_none(self):
    143         value = alist + [None]
    144         arg1 = (alist + [None],)
    145         strg = xmlrpclib.dumps(arg1, allow_none=True)
    146         self.assertEqual(value,
    147                          xmlrpclib.loads(strg)[0][0])
    148         self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
    149 
    150     @test_support.requires_unicode
    151     def test_dump_encoding(self):
    152         value = {test_support.u(r'key\u20ac\xa4'):
    153                  test_support.u(r'value\u20ac\xa4')}
    154         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
    155         strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
    156         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    157 
    158         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
    159                                methodresponse=True)
    160         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    161 
    162         methodname = test_support.u(r'method\u20ac\xa4')
    163         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
    164                                methodname=methodname)
    165         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    166         self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
    167 
    168     @test_support.requires_unicode
    169     def test_default_encoding_issues(self):
    170         # SF bug #1115989: wrong decoding in '_stringify'
    171         utf8 = """<?xml version='1.0' encoding='iso-8859-1'?>
    172                   <params>
    173                     <param><value>
    174                       <string>abc \x95</string>
    175                       </value></param>
    176                     <param><value>
    177                       <struct>
    178                         <member>
    179                           <name>def \x96</name>
    180                           <value><string>ghi \x97</string></value>
    181                           </member>
    182                         </struct>
    183                       </value></param>
    184                   </params>
    185                   """
    186 
    187         # sys.setdefaultencoding() normally doesn't exist after site.py is
    188         # loaded.  Import a temporary fresh copy to get access to it
    189         # but then restore the original copy to avoid messing with
    190         # other potentially modified sys module attributes
    191         old_encoding = sys.getdefaultencoding()
    192         with test_support.CleanImport('sys'):
    193             import sys as temp_sys
    194             temp_sys.setdefaultencoding("iso-8859-1")
    195             try:
    196                 (s, d), m = xmlrpclib.loads(utf8)
    197             finally:
    198                 temp_sys.setdefaultencoding(old_encoding)
    199 
    200         items = d.items()
    201         if test_support.have_unicode:
    202             self.assertEqual(s, u"abc \x95")
    203             self.assertIsInstance(s, unicode)
    204             self.assertEqual(items, [(u"def \x96", u"ghi \x97")])
    205             self.assertIsInstance(items[0][0], unicode)
    206             self.assertIsInstance(items[0][1], unicode)
    207         else:
    208             self.assertEqual(s, "abc \xc2\x95")
    209             self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")])
    210 
    211     def test_loads_unsupported(self):
    212         ResponseError = xmlrpclib.ResponseError
    213         data = '<params><param><value><spam/></value></param></params>'
    214         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    215         data = ('<params><param><value><array>'
    216                 '<value><spam/></value>'
    217                 '</array></value></param></params>')
    218         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    219         data = ('<params><param><value><struct>'
    220                 '<member><name>a</name><value><spam/></value></member>'
    221                 '<member><name>b</name><value><spam/></value></member>'
    222                 '</struct></value></param></params>')
    223         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    224 
    225 
    226 class HelperTestCase(unittest.TestCase):
    227     def test_escape(self):
    228         self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
    229         self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
    230         self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
    231 
    232 class FaultTestCase(unittest.TestCase):
    233     def test_repr(self):
    234         f = xmlrpclib.Fault(42, 'Test Fault')
    235         self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
    236         self.assertEqual(repr(f), str(f))
    237 
    238     def test_dump_fault(self):
    239         f = xmlrpclib.Fault(42, 'Test Fault')
    240         s = xmlrpclib.dumps((f,))
    241         (newf,), m = xmlrpclib.loads(s)
    242         self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
    243         self.assertEqual(m, None)
    244 
    245         s = xmlrpclib.Marshaller().dumps(f)
    246         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
    247 
    248 
    249 class DateTimeTestCase(unittest.TestCase):
    250     def test_default(self):
    251         t = xmlrpclib.DateTime()
    252 
    253     def test_time(self):
    254         d = 1181399930.036952
    255         t = xmlrpclib.DateTime(d)
    256         self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
    257 
    258     def test_time_tuple(self):
    259         d = (2007,6,9,10,38,50,5,160,0)
    260         t = xmlrpclib.DateTime(d)
    261         self.assertEqual(str(t), '20070609T10:38:50')
    262 
    263     def test_time_struct(self):
    264         d = time.localtime(1181399930.036952)
    265         t = xmlrpclib.DateTime(d)
    266         self.assertEqual(str(t),  time.strftime("%Y%m%dT%H:%M:%S", d))
    267 
    268     def test_datetime_datetime(self):
    269         d = datetime.datetime(2007,1,2,3,4,5)
    270         t = xmlrpclib.DateTime(d)
    271         self.assertEqual(str(t), '20070102T03:04:05')
    272 
    273     def test_repr(self):
    274         d = datetime.datetime(2007,1,2,3,4,5)
    275         t = xmlrpclib.DateTime(d)
    276         val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
    277         self.assertEqual(repr(t), val)
    278 
    279     def test_decode(self):
    280         d = ' 20070908T07:11:13  '
    281         t1 = xmlrpclib.DateTime()
    282         t1.decode(d)
    283         tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
    284         self.assertEqual(t1, tref)
    285 
    286         t2 = xmlrpclib._datetime(d)
    287         self.assertEqual(t1, tref)
    288 
    289 class BinaryTestCase(unittest.TestCase):
    290     def test_default(self):
    291         t = xmlrpclib.Binary()
    292         self.assertEqual(str(t), '')
    293 
    294     def test_string(self):
    295         d = '\x01\x02\x03abc123\xff\xfe'
    296         t = xmlrpclib.Binary(d)
    297         self.assertEqual(str(t), d)
    298 
    299     def test_decode(self):
    300         d = '\x01\x02\x03abc123\xff\xfe'
    301         de = base64.encodestring(d)
    302         t1 = xmlrpclib.Binary()
    303         t1.decode(de)
    304         self.assertEqual(str(t1), d)
    305 
    306         t2 = xmlrpclib._binary(de)
    307         self.assertEqual(str(t2), d)
    308 
    309 
    310 ADDR = PORT = URL = None
    311 
    312 # The evt is set twice.  First when the server is ready to serve.
    313 # Second when the server has been shutdown.  The user must clear
    314 # the event after it has been set the first time to catch the second set.
    315 def http_server(evt, numrequests, requestHandler=None, encoding=None):
    316     class TestInstanceClass:
    317         def div(self, x, y):
    318             return x // y
    319 
    320         def _methodHelp(self, name):
    321             if name == 'div':
    322                 return 'This is the div function'
    323 
    324     def my_function():
    325         '''This is my function'''
    326         return True
    327 
    328     class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
    329         def get_request(self):
    330             # Ensure the socket is always non-blocking.  On Linux, socket
    331             # attributes are not inherited like they are on *BSD and Windows.
    332             s, port = self.socket.accept()
    333             s.setblocking(True)
    334             return s, port
    335 
    336     if not requestHandler:
    337         requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
    338     serv = MyXMLRPCServer(("localhost", 0), requestHandler,
    339                           encoding=encoding,
    340                           logRequests=False, bind_and_activate=False)
    341     try:
    342         serv.socket.settimeout(3)
    343         serv.server_bind()
    344         global ADDR, PORT, URL
    345         ADDR, PORT = serv.socket.getsockname()
    346         #connect to IP address directly.  This avoids socket.create_connection()
    347         #trying to connect to "localhost" using all address families, which
    348         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
    349         #on AF_INET only.
    350         URL = "http://%s:%d"%(ADDR, PORT)
    351         serv.server_activate()
    352         serv.register_introspection_functions()
    353         serv.register_multicall_functions()
    354         serv.register_function(pow)
    355         serv.register_function(lambda x,y: x+y, 'add')
    356         serv.register_function(lambda x: x, test_support.u(r't\xea\u0161t'))
    357         serv.register_function(my_function)
    358         serv.register_instance(TestInstanceClass())
    359         evt.set()
    360 
    361         # handle up to 'numrequests' requests
    362         while numrequests > 0:
    363             serv.handle_request()
    364             numrequests -= 1
    365 
    366     except socket.timeout:
    367         pass
    368     finally:
    369         serv.socket.close()
    370         PORT = None
    371         evt.set()
    372 
    373 def http_multi_server(evt, numrequests, requestHandler=None):
    374     class TestInstanceClass:
    375         def div(self, x, y):
    376             return x // y
    377 
    378         def _methodHelp(self, name):
    379             if name == 'div':
    380                 return 'This is the div function'
    381 
    382     def my_function():
    383         '''This is my function'''
    384         return True
    385 
    386     class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer):
    387         def get_request(self):
    388             # Ensure the socket is always non-blocking.  On Linux, socket
    389             # attributes are not inherited like they are on *BSD and Windows.
    390             s, port = self.socket.accept()
    391             s.setblocking(True)
    392             return s, port
    393 
    394     if not requestHandler:
    395         requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
    396     class MyRequestHandler(requestHandler):
    397         rpc_paths = []
    398 
    399     serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
    400                           logRequests=False, bind_and_activate=False)
    401     serv.socket.settimeout(3)
    402     serv.server_bind()
    403     try:
    404         global ADDR, PORT, URL
    405         ADDR, PORT = serv.socket.getsockname()
    406         #connect to IP address directly.  This avoids socket.create_connection()
    407         #trying to connect to "localhost" using all address families, which
    408         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
    409         #on AF_INET only.
    410         URL = "http://%s:%d"%(ADDR, PORT)
    411         serv.server_activate()
    412         paths = ["/foo", "/foo/bar"]
    413         for path in paths:
    414             d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher())
    415             d.register_introspection_functions()
    416             d.register_multicall_functions()
    417         serv.get_dispatcher(paths[0]).register_function(pow)
    418         serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
    419         evt.set()
    420 
    421         # handle up to 'numrequests' requests
    422         while numrequests > 0:
    423             serv.handle_request()
    424             numrequests -= 1
    425 
    426     except socket.timeout:
    427         pass
    428     finally:
    429         serv.socket.close()
    430         PORT = None
    431         evt.set()
    432 
    433 # This function prevents errors like:
    434 #    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
    435 def is_unavailable_exception(e):
    436     '''Returns True if the given ProtocolError is the product of a server-side
    437        exception caused by the 'temporarily unavailable' response sometimes
    438        given by operations on non-blocking sockets.'''
    439 
    440     # sometimes we get a -1 error code and/or empty headers
    441     try:
    442         if e.errcode == -1 or e.headers is None:
    443             return True
    444         exc_mess = e.headers.get('X-exception')
    445     except AttributeError:
    446         # Ignore socket.errors here.
    447         exc_mess = str(e)
    448 
    449     if exc_mess and 'temporarily unavailable' in exc_mess.lower():
    450         return True
    451 
    452     return False
    453 
    454 @unittest.skipUnless(threading, 'Threading required for this test.')
    455 class BaseServerTestCase(unittest.TestCase):
    456     requestHandler = None
    457     request_count = 1
    458     threadFunc = staticmethod(http_server)
    459 
    460     def setUp(self):
    461         # enable traceback reporting
    462         SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
    463 
    464         self.evt = threading.Event()
    465         # start server thread to handle requests
    466         serv_args = (self.evt, self.request_count, self.requestHandler)
    467         threading.Thread(target=self.threadFunc, args=serv_args).start()
    468 
    469         # wait for the server to be ready
    470         self.evt.wait(10)
    471         self.evt.clear()
    472 
    473     def tearDown(self):
    474         # wait on the server thread to terminate
    475         self.evt.wait(10)
    476 
    477         # disable traceback reporting
    478         SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
    479 
    480 # NOTE: The tests in SimpleServerTestCase will ignore failures caused by
    481 # "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
    482 # condition occurs infrequently on some platforms, frequently on others, and
    483 # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
    484 # If the server class is updated at some point in the future to handle this
    485 # situation more gracefully, these tests should be modified appropriately.
    486 
    487 class SimpleServerTestCase(BaseServerTestCase):
    488     def test_simple1(self):
    489         try:
    490             p = xmlrpclib.ServerProxy(URL)
    491             self.assertEqual(p.pow(6,8), 6**8)
    492         except (xmlrpclib.ProtocolError, socket.error), e:
    493             # ignore failures due to non-blocking socket 'unavailable' errors
    494             if not is_unavailable_exception(e):
    495                 # protocol error; provide additional information in test output
    496                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    497 
    498     @test_support.requires_unicode
    499     def test_nonascii(self):
    500         start_string = test_support.u(r'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t')
    501         end_string = test_support.u(r'h\N{LATIN SMALL LETTER O WITH HORN}n')
    502 
    503         try:
    504             p = xmlrpclib.ServerProxy(URL)
    505             self.assertEqual(p.add(start_string, end_string),
    506                              start_string + end_string)
    507         except (xmlrpclib.ProtocolError, socket.error) as e:
    508             # ignore failures due to non-blocking socket unavailable errors.
    509             if not is_unavailable_exception(e):
    510                 # protocol error; provide additional information in test output
    511                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    512 
    513     @test_support.requires_unicode
    514     def test_unicode_host(self):
    515         server = xmlrpclib.ServerProxy(u"http://%s:%d/RPC2"%(ADDR, PORT))
    516         self.assertEqual(server.add("a", u"\xe9"), u"a\xe9")
    517 
    518     @test_support.requires_unicode
    519     def test_client_encoding(self):
    520         start_string = unichr(0x20ac)
    521         end_string = unichr(0xa4)
    522 
    523         try:
    524             p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
    525             self.assertEqual(p.add(start_string, end_string),
    526                              start_string + end_string)
    527         except (xmlrpclib.ProtocolError, socket.error) as e:
    528             # ignore failures due to non-blocking socket unavailable errors.
    529             if not is_unavailable_exception(e):
    530                 # protocol error; provide additional information in test output
    531                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    532 
    533     @test_support.requires_unicode
    534     def test_nonascii_methodname(self):
    535         try:
    536             p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
    537             m = getattr(p, 't\xea\xa8t')
    538             self.assertEqual(m(42), 42)
    539         except (xmlrpclib.ProtocolError, socket.error) as e:
    540             # ignore failures due to non-blocking socket unavailable errors.
    541             if not is_unavailable_exception(e):
    542                 # protocol error; provide additional information in test output
    543                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    544 
    545     # [ch] The test 404 is causing lots of false alarms.
    546     def XXXtest_404(self):
    547         # send POST with httplib, it should return 404 header and
    548         # 'Not Found' message.
    549         conn = httplib.HTTPConnection(ADDR, PORT)
    550         conn.request('POST', '/this-is-not-valid')
    551         response = conn.getresponse()
    552         conn.close()
    553 
    554         self.assertEqual(response.status, 404)
    555         self.assertEqual(response.reason, 'Not Found')
    556 
    557     def test_introspection1(self):
    558         try:
    559             p = xmlrpclib.ServerProxy(URL)
    560             meth = p.system.listMethods()
    561             expected_methods = set(['pow', 'div', 'my_function', 'add',
    562                                     test_support.u(r't\xea\u0161t'),
    563                                     'system.listMethods', 'system.methodHelp',
    564                                     'system.methodSignature', 'system.multicall'])
    565             self.assertEqual(set(meth), expected_methods)
    566         except (xmlrpclib.ProtocolError, socket.error), e:
    567             # ignore failures due to non-blocking socket 'unavailable' errors
    568             if not is_unavailable_exception(e):
    569                 # protocol error; provide additional information in test output
    570                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    571 
    572     def test_introspection2(self):
    573         try:
    574             # test _methodHelp()
    575             p = xmlrpclib.ServerProxy(URL)
    576             divhelp = p.system.methodHelp('div')
    577             self.assertEqual(divhelp, 'This is the div function')
    578         except (xmlrpclib.ProtocolError, socket.error), e:
    579             # ignore failures due to non-blocking socket 'unavailable' errors
    580             if not is_unavailable_exception(e):
    581                 # protocol error; provide additional information in test output
    582                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    583 
    584     @unittest.skipIf(sys.flags.optimize >= 2,
    585                      "Docstrings are omitted with -O2 and above")
    586     def test_introspection3(self):
    587         try:
    588             # test native doc
    589             p = xmlrpclib.ServerProxy(URL)
    590             myfunction = p.system.methodHelp('my_function')
    591             self.assertEqual(myfunction, 'This is my function')
    592         except (xmlrpclib.ProtocolError, socket.error), e:
    593             # ignore failures due to non-blocking socket 'unavailable' errors
    594             if not is_unavailable_exception(e):
    595                 # protocol error; provide additional information in test output
    596                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    597 
    598     def test_introspection4(self):
    599         # the SimpleXMLRPCServer doesn't support signatures, but
    600         # at least check that we can try making the call
    601         try:
    602             p = xmlrpclib.ServerProxy(URL)
    603             divsig = p.system.methodSignature('div')
    604             self.assertEqual(divsig, 'signatures not supported')
    605         except (xmlrpclib.ProtocolError, socket.error), e:
    606             # ignore failures due to non-blocking socket 'unavailable' errors
    607             if not is_unavailable_exception(e):
    608                 # protocol error; provide additional information in test output
    609                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    610 
    611     def test_multicall(self):
    612         try:
    613             p = xmlrpclib.ServerProxy(URL)
    614             multicall = xmlrpclib.MultiCall(p)
    615             multicall.add(2,3)
    616             multicall.pow(6,8)
    617             multicall.div(127,42)
    618             add_result, pow_result, div_result = multicall()
    619             self.assertEqual(add_result, 2+3)
    620             self.assertEqual(pow_result, 6**8)
    621             self.assertEqual(div_result, 127//42)
    622         except (xmlrpclib.ProtocolError, socket.error), e:
    623             # ignore failures due to non-blocking socket 'unavailable' errors
    624             if not is_unavailable_exception(e):
    625                 # protocol error; provide additional information in test output
    626                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    627 
    628     def test_non_existing_multicall(self):
    629         try:
    630             p = xmlrpclib.ServerProxy(URL)
    631             multicall = xmlrpclib.MultiCall(p)
    632             multicall.this_is_not_exists()
    633             result = multicall()
    634 
    635             # result.results contains;
    636             # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:'
    637             #   'method "this_is_not_exists" is not supported'>}]
    638 
    639             self.assertEqual(result.results[0]['faultCode'], 1)
    640             self.assertEqual(result.results[0]['faultString'],
    641                 '<type \'exceptions.Exception\'>:method "this_is_not_exists" '
    642                 'is not supported')
    643         except (xmlrpclib.ProtocolError, socket.error), e:
    644             # ignore failures due to non-blocking socket 'unavailable' errors
    645             if not is_unavailable_exception(e):
    646                 # protocol error; provide additional information in test output
    647                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    648 
    649     def test_dotted_attribute(self):
    650         # Raises an AttributeError because private methods are not allowed.
    651         self.assertRaises(AttributeError,
    652                           SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
    653 
    654         self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
    655         # Get the test to run faster by sending a request with test_simple1.
    656         # This avoids waiting for the socket timeout.
    657         self.test_simple1()
    658 
    659     def test_partial_post(self):
    660         # Check that a partial POST doesn't make the server loop: issue #14001.
    661         conn = httplib.HTTPConnection(ADDR, PORT)
    662         conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
    663         conn.close()
    664 
    665 class SimpleServerEncodingTestCase(BaseServerTestCase):
    666     @staticmethod
    667     def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
    668         http_server(evt, numrequests, requestHandler, 'iso-8859-15')
    669 
    670     @test_support.requires_unicode
    671     def test_server_encoding(self):
    672         start_string = unichr(0x20ac)
    673         end_string = unichr(0xa4)
    674 
    675         try:
    676             p = xmlrpclib.ServerProxy(URL)
    677             self.assertEqual(p.add(start_string, end_string),
    678                              start_string + end_string)
    679         except (xmlrpclib.ProtocolError, socket.error) as e:
    680             # ignore failures due to non-blocking socket unavailable errors.
    681             if not is_unavailable_exception(e):
    682                 # protocol error; provide additional information in test output
    683                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    684 
    685 
    686 class MultiPathServerTestCase(BaseServerTestCase):
    687     threadFunc = staticmethod(http_multi_server)
    688     request_count = 2
    689     def test_path1(self):
    690         p = xmlrpclib.ServerProxy(URL+"/foo")
    691         self.assertEqual(p.pow(6,8), 6**8)
    692         self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    693     def test_path2(self):
    694         p = xmlrpclib.ServerProxy(URL+"/foo/bar")
    695         self.assertEqual(p.add(6,8), 6+8)
    696         self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
    697 
    698 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    699 #does indeed serve subsequent requests on the same connection
    700 class BaseKeepaliveServerTestCase(BaseServerTestCase):
    701     #a request handler that supports keep-alive and logs requests into a
    702     #class variable
    703     class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
    704         parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
    705         protocol_version = 'HTTP/1.1'
    706         myRequests = []
    707         def handle(self):
    708             self.myRequests.append([])
    709             self.reqidx = len(self.myRequests)-1
    710             return self.parentClass.handle(self)
    711         def handle_one_request(self):
    712             result = self.parentClass.handle_one_request(self)
    713             self.myRequests[self.reqidx].append(self.raw_requestline)
    714             return result
    715 
    716     requestHandler = RequestHandler
    717     def setUp(self):
    718         #clear request log
    719         self.RequestHandler.myRequests = []
    720         return BaseServerTestCase.setUp(self)
    721 
    722 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    723 #does indeed serve subsequent requests on the same connection
    724 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
    725     def test_two(self):
    726         p = xmlrpclib.ServerProxy(URL)
    727         #do three requests.
    728         self.assertEqual(p.pow(6,8), 6**8)
    729         self.assertEqual(p.pow(6,8), 6**8)
    730         self.assertEqual(p.pow(6,8), 6**8)
    731 
    732         #they should have all been handled by a single request handler
    733         self.assertEqual(len(self.RequestHandler.myRequests), 1)
    734 
    735         #check that we did at least two (the third may be pending append
    736         #due to thread scheduling)
    737         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    738 
    739 #test special attribute access on the serverproxy, through the __call__
    740 #function.
    741 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
    742     #ask for two keepalive requests to be handled.
    743     request_count=2
    744 
    745     def test_close(self):
    746         p = xmlrpclib.ServerProxy(URL)
    747         #do some requests with close.
    748         self.assertEqual(p.pow(6,8), 6**8)
    749         self.assertEqual(p.pow(6,8), 6**8)
    750         self.assertEqual(p.pow(6,8), 6**8)
    751         p("close")() #this should trigger a new keep-alive request
    752         self.assertEqual(p.pow(6,8), 6**8)
    753         self.assertEqual(p.pow(6,8), 6**8)
    754         self.assertEqual(p.pow(6,8), 6**8)
    755 
    756         #they should have all been two request handlers, each having logged at least
    757         #two complete requests
    758         self.assertEqual(len(self.RequestHandler.myRequests), 2)
    759         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    760         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
    761 
    762     def test_transport(self):
    763         p = xmlrpclib.ServerProxy(URL)
    764         #do some requests with close.
    765         self.assertEqual(p.pow(6,8), 6**8)
    766         p("transport").close() #same as above, really.
    767         self.assertEqual(p.pow(6,8), 6**8)
    768         self.assertEqual(len(self.RequestHandler.myRequests), 2)
    769 
    770 #A test case that verifies that gzip encoding works in both directions
    771 #(for a request and the response)
    772 @unittest.skipUnless(gzip, 'gzip not available')
    773 class GzipServerTestCase(BaseServerTestCase):
    774     #a request handler that supports keep-alive and logs requests into a
    775     #class variable
    776     class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
    777         parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
    778         protocol_version = 'HTTP/1.1'
    779 
    780         def do_POST(self):
    781             #store content of last request in class
    782             self.__class__.content_length = int(self.headers["content-length"])
    783             return self.parentClass.do_POST(self)
    784     requestHandler = RequestHandler
    785 
    786     class Transport(xmlrpclib.Transport):
    787         #custom transport, stores the response length for our perusal
    788         fake_gzip = False
    789         def parse_response(self, response):
    790             self.response_length=int(response.getheader("content-length", 0))
    791             return xmlrpclib.Transport.parse_response(self, response)
    792 
    793         def send_content(self, connection, body):
    794             if self.fake_gzip:
    795                 #add a lone gzip header to induce decode error remotely
    796                 connection.putheader("Content-Encoding", "gzip")
    797             return xmlrpclib.Transport.send_content(self, connection, body)
    798 
    799     def setUp(self):
    800         BaseServerTestCase.setUp(self)
    801 
    802     def test_gzip_request(self):
    803         t = self.Transport()
    804         t.encode_threshold = None
    805         p = xmlrpclib.ServerProxy(URL, transport=t)
    806         self.assertEqual(p.pow(6,8), 6**8)
    807         a = self.RequestHandler.content_length
    808         t.encode_threshold = 0 #turn on request encoding
    809         self.assertEqual(p.pow(6,8), 6**8)
    810         b = self.RequestHandler.content_length
    811         self.assertTrue(a>b)
    812 
    813     def test_bad_gzip_request(self):
    814         t = self.Transport()
    815         t.encode_threshold = None
    816         t.fake_gzip = True
    817         p = xmlrpclib.ServerProxy(URL, transport=t)
    818         cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
    819                                      re.compile(r"\b400\b"))
    820         with cm:
    821             p.pow(6, 8)
    822 
    823     def test_gzip_response(self):
    824         t = self.Transport()
    825         p = xmlrpclib.ServerProxy(URL, transport=t)
    826         old = self.requestHandler.encode_threshold
    827         self.requestHandler.encode_threshold = None #no encoding
    828         self.assertEqual(p.pow(6,8), 6**8)
    829         a = t.response_length
    830         self.requestHandler.encode_threshold = 0 #always encode
    831         self.assertEqual(p.pow(6,8), 6**8)
    832         b = t.response_length
    833         self.requestHandler.encode_threshold = old
    834         self.assertTrue(a>b)
    835 
    836     def test_gzip_decode_limit(self):
    837         max_gzip_decode = 20 * 1024 * 1024
    838         data = '\0' * max_gzip_decode
    839         encoded = xmlrpclib.gzip_encode(data)
    840         decoded = xmlrpclib.gzip_decode(encoded)
    841         self.assertEqual(len(decoded), max_gzip_decode)
    842 
    843         data = '\0' * (max_gzip_decode + 1)
    844         encoded = xmlrpclib.gzip_encode(data)
    845 
    846         with self.assertRaisesRegexp(ValueError,
    847                                      "max gzipped payload length exceeded"):
    848             xmlrpclib.gzip_decode(encoded)
    849 
    850         xmlrpclib.gzip_decode(encoded, max_decode=-1)
    851 
    852 
    853 #Test special attributes of the ServerProxy object
    854 class ServerProxyTestCase(unittest.TestCase):
    855     def setUp(self):
    856         unittest.TestCase.setUp(self)
    857         if threading:
    858             self.url = URL
    859         else:
    860             # Without threading, http_server() and http_multi_server() will not
    861             # be executed and URL is still equal to None. 'http://' is a just
    862             # enough to choose the scheme (HTTP)
    863             self.url = 'http://'
    864 
    865     def test_close(self):
    866         p = xmlrpclib.ServerProxy(self.url)
    867         self.assertEqual(p('close')(), None)
    868 
    869     def test_transport(self):
    870         t = xmlrpclib.Transport()
    871         p = xmlrpclib.ServerProxy(self.url, transport=t)
    872         self.assertEqual(p('transport'), t)
    873 
    874 # This is a contrived way to make a failure occur on the server side
    875 # in order to test the _send_traceback_header flag on the server
    876 class FailingMessageClass(mimetools.Message):
    877     def __getitem__(self, key):
    878         key = key.lower()
    879         if key == 'content-length':
    880             return 'I am broken'
    881         return mimetools.Message.__getitem__(self, key)
    882 
    883 
    884 @unittest.skipUnless(threading, 'Threading required for this test.')
    885 class FailingServerTestCase(unittest.TestCase):
    886     def setUp(self):
    887         self.evt = threading.Event()
    888         # start server thread to handle requests
    889         serv_args = (self.evt, 1)
    890         threading.Thread(target=http_server, args=serv_args).start()
    891 
    892         # wait for the server to be ready
    893         self.evt.wait()
    894         self.evt.clear()
    895 
    896     def tearDown(self):
    897         # wait on the server thread to terminate
    898         self.evt.wait()
    899         # reset flag
    900         SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
    901         # reset message class
    902         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
    903 
    904     def test_basic(self):
    905         # check that flag is false by default
    906         flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
    907         self.assertEqual(flagval, False)
    908 
    909         # enable traceback reporting
    910         SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
    911 
    912         # test a call that shouldn't fail just as a smoke test
    913         try:
    914             p = xmlrpclib.ServerProxy(URL)
    915             self.assertEqual(p.pow(6,8), 6**8)
    916         except (xmlrpclib.ProtocolError, socket.error), e:
    917             # ignore failures due to non-blocking socket 'unavailable' errors
    918             if not is_unavailable_exception(e):
    919                 # protocol error; provide additional information in test output
    920                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    921 
    922     def test_fail_no_info(self):
    923         # use the broken message class
    924         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
    925 
    926         try:
    927             p = xmlrpclib.ServerProxy(URL)
    928             p.pow(6,8)
    929         except (xmlrpclib.ProtocolError, socket.error), e:
    930             # ignore failures due to non-blocking socket 'unavailable' errors
    931             if not is_unavailable_exception(e) and hasattr(e, "headers"):
    932                 # The two server-side error headers shouldn't be sent back in this case
    933                 self.assertTrue(e.headers.get("X-exception") is None)
    934                 self.assertTrue(e.headers.get("X-traceback") is None)
    935         else:
    936             self.fail('ProtocolError not raised')
    937 
    938     def test_fail_with_info(self):
    939         # use the broken message class
    940         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
    941 
    942         # Check that errors in the server send back exception/traceback
    943         # info when flag is set
    944         SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
    945 
    946         try:
    947             p = xmlrpclib.ServerProxy(URL)
    948             p.pow(6,8)
    949         except (xmlrpclib.ProtocolError, socket.error), e:
    950             # ignore failures due to non-blocking socket 'unavailable' errors
    951             if not is_unavailable_exception(e) and hasattr(e, "headers"):
    952                 # We should get error info in the response
    953                 expected_err = "invalid literal for int() with base 10: 'I am broken'"
    954                 self.assertEqual(e.headers.get("x-exception"), expected_err)
    955                 self.assertTrue(e.headers.get("x-traceback") is not None)
    956         else:
    957             self.fail('ProtocolError not raised')
    958 
    959 class CGIHandlerTestCase(unittest.TestCase):
    960     def setUp(self):
    961         self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
    962 
    963     def tearDown(self):
    964         self.cgi = None
    965 
    966     def test_cgi_get(self):
    967         with test_support.EnvironmentVarGuard() as env:
    968             env['REQUEST_METHOD'] = 'GET'
    969             # if the method is GET and no request_text is given, it runs handle_get
    970             # get sysout output
    971             with test_support.captured_stdout() as data_out:
    972                 self.cgi.handle_request()
    973 
    974             # parse Status header
    975             data_out.seek(0)
    976             handle = data_out.read()
    977             status = handle.split()[1]
    978             message = ' '.join(handle.split()[2:4])
    979 
    980             self.assertEqual(status, '400')
    981             self.assertEqual(message, 'Bad Request')
    982 
    983 
    984     def test_cgi_xmlrpc_response(self):
    985         data = """<?xml version='1.0'?>
    986         <methodCall>
    987             <methodName>test_method</methodName>
    988             <params>
    989                 <param>
    990                     <value><string>foo</string></value>
    991                 </param>
    992                 <param>
    993                     <value><string>bar</string></value>
    994                 </param>
    995             </params>
    996         </methodCall>
    997         """
    998 
    999         with test_support.EnvironmentVarGuard() as env, \
   1000              test_support.captured_stdout() as data_out, \
   1001              test_support.captured_stdin() as data_in:
   1002             data_in.write(data)
   1003             data_in.seek(0)
   1004             env['CONTENT_LENGTH'] = str(len(data))
   1005             self.cgi.handle_request()
   1006         data_out.seek(0)
   1007 
   1008         # will respond exception, if so, our goal is achieved ;)
   1009         handle = data_out.read()
   1010 
   1011         # start with 44th char so as not to get http header, we just need only xml
   1012         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
   1013 
   1014         # Also test the content-length returned  by handle_request
   1015         # Using the same test method inorder to avoid all the datapassing
   1016         # boilerplate code.
   1017         # Test for bug: http://bugs.python.org/issue5040
   1018 
   1019         content = handle[handle.find("<?xml"):]
   1020 
   1021         self.assertEqual(
   1022             int(re.search('Content-Length: (\d+)', handle).group(1)),
   1023             len(content))
   1024 
   1025 
   1026 class FakeSocket:
   1027 
   1028     def __init__(self):
   1029         self.data = StringIO.StringIO()
   1030 
   1031     def send(self, buf):
   1032         self.data.write(buf)
   1033         return len(buf)
   1034 
   1035     def sendall(self, buf):
   1036         self.data.write(buf)
   1037 
   1038     def getvalue(self):
   1039         return self.data.getvalue()
   1040 
   1041     def makefile(self, x='r', y=-1):
   1042         raise RuntimeError
   1043 
   1044     def close(self):
   1045         pass
   1046 
   1047 class FakeTransport(xmlrpclib.Transport):
   1048     """A Transport instance that records instead of sending a request.
   1049 
   1050     This class replaces the actual socket used by httplib with a
   1051     FakeSocket object that records the request.  It doesn't provide a
   1052     response.
   1053     """
   1054 
   1055     def make_connection(self, host):
   1056         conn = xmlrpclib.Transport.make_connection(self, host)
   1057         conn.sock = self.fake_socket = FakeSocket()
   1058         return conn
   1059 
   1060 class TransportSubclassTestCase(unittest.TestCase):
   1061 
   1062     def issue_request(self, transport_class):
   1063         """Return an HTTP request made via transport_class."""
   1064         transport = transport_class()
   1065         proxy = xmlrpclib.ServerProxy("http://example.com/",
   1066                                       transport=transport)
   1067         try:
   1068             proxy.pow(6, 8)
   1069         except RuntimeError:
   1070             return transport.fake_socket.getvalue()
   1071         return None
   1072 
   1073     def test_custom_user_agent(self):
   1074         class TestTransport(FakeTransport):
   1075 
   1076             def send_user_agent(self, conn):
   1077                 xmlrpclib.Transport.send_user_agent(self, conn)
   1078                 conn.putheader("X-Test", "test_custom_user_agent")
   1079 
   1080         req = self.issue_request(TestTransport)
   1081         self.assertIn("X-Test: test_custom_user_agent\r\n", req)
   1082 
   1083     def test_send_host(self):
   1084         class TestTransport(FakeTransport):
   1085 
   1086             def send_host(self, conn, host):
   1087                 xmlrpclib.Transport.send_host(self, conn, host)
   1088                 conn.putheader("X-Test", "test_send_host")
   1089 
   1090         req = self.issue_request(TestTransport)
   1091         self.assertIn("X-Test: test_send_host\r\n", req)
   1092 
   1093     def test_send_request(self):
   1094         class TestTransport(FakeTransport):
   1095 
   1096             def send_request(self, conn, url, body):
   1097                 xmlrpclib.Transport.send_request(self, conn, url, body)
   1098                 conn.putheader("X-Test", "test_send_request")
   1099 
   1100         req = self.issue_request(TestTransport)
   1101         self.assertIn("X-Test: test_send_request\r\n", req)
   1102 
   1103     def test_send_content(self):
   1104         class TestTransport(FakeTransport):
   1105 
   1106             def send_content(self, conn, body):
   1107                 conn.putheader("X-Test", "test_send_content")
   1108                 xmlrpclib.Transport.send_content(self, conn, body)
   1109 
   1110         req = self.issue_request(TestTransport)
   1111         self.assertIn("X-Test: test_send_content\r\n", req)
   1112 
   1113 @test_support.reap_threads
   1114 def test_main():
   1115     xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
   1116          BinaryTestCase, FaultTestCase, TransportSubclassTestCase]
   1117     xmlrpc_tests.append(SimpleServerTestCase)
   1118     xmlrpc_tests.append(SimpleServerEncodingTestCase)
   1119     xmlrpc_tests.append(KeepaliveServerTestCase1)
   1120     xmlrpc_tests.append(KeepaliveServerTestCase2)
   1121     xmlrpc_tests.append(GzipServerTestCase)
   1122     xmlrpc_tests.append(MultiPathServerTestCase)
   1123     xmlrpc_tests.append(ServerProxyTestCase)
   1124     xmlrpc_tests.append(FailingServerTestCase)
   1125     xmlrpc_tests.append(CGIHandlerTestCase)
   1126 
   1127     test_support.run_unittest(*xmlrpc_tests)
   1128 
   1129 if __name__ == "__main__":
   1130     test_main()
   1131