Home | History | Annotate | Download | only in test
      1 import base64
      2 import datetime
      3 import decimal
      4 import sys
      5 import time
      6 import unittest
      7 from unittest import mock
      8 import xmlrpc.client as xmlrpclib
      9 import xmlrpc.server
     10 import http.client
     11 import http, http.server
     12 import socket
     13 import re
     14 import io
     15 import contextlib
     16 from test import support
     17 
     18 try:
     19     import gzip
     20 except ImportError:
     21     gzip = None
     22 try:
     23     import threading
     24 except ImportError:
     25     threading = None
     26 
     27 alist = [{'astring': 'foo (at] bar.baz.spam',
     28           'afloat': 7283.43,
     29           'anint': 2**20,
     30           'ashortlong': 2,
     31           'anotherlist': ['.zyx.41'],
     32           'abase64': xmlrpclib.Binary(b"my dog has fleas"),
     33           'b64bytes': b"my dog has fleas",
     34           'b64bytearray': bytearray(b"my dog has fleas"),
     35           'boolean': False,
     36           'unicode': '\u4000\u6000\u8000',
     37           'ukey\u4000': 'regular value',
     38           'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
     39           'datetime2': xmlrpclib.DateTime(
     40                         (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
     41           'datetime3': xmlrpclib.DateTime(
     42                         datetime.datetime(2005, 2, 10, 11, 41, 23)),
     43           }]
     44 
     45 class XMLRPCTestCase(unittest.TestCase):
     46 
     47     def test_dump_load(self):
     48         dump = xmlrpclib.dumps((alist,))
     49         load = xmlrpclib.loads(dump)
     50         self.assertEqual(alist, load[0][0])
     51 
     52     def test_dump_bare_datetime(self):
     53         # This checks that an unwrapped datetime.date object can be handled
     54         # by the marshalling code.  This can't be done via test_dump_load()
     55         # since with use_builtin_types set to 1 the unmarshaller would create
     56         # datetime objects for the 'datetime[123]' keys as well
     57         dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
     58         self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
     59         s = xmlrpclib.dumps((dt,))
     60 
     61         result, m = xmlrpclib.loads(s, use_builtin_types=True)
     62         (newdt,) = result
     63         self.assertEqual(newdt, dt)
     64         self.assertIs(type(newdt), datetime.datetime)
     65         self.assertIsNone(m)
     66 
     67         result, m = xmlrpclib.loads(s, use_builtin_types=False)
     68         (newdt,) = result
     69         self.assertEqual(newdt, dt)
     70         self.assertIs(type(newdt), xmlrpclib.DateTime)
     71         self.assertIsNone(m)
     72 
     73         result, m = xmlrpclib.loads(s, use_datetime=True)
     74         (newdt,) = result
     75         self.assertEqual(newdt, dt)
     76         self.assertIs(type(newdt), datetime.datetime)
     77         self.assertIsNone(m)
     78 
     79         result, m = xmlrpclib.loads(s, use_datetime=False)
     80         (newdt,) = result
     81         self.assertEqual(newdt, dt)
     82         self.assertIs(type(newdt), xmlrpclib.DateTime)
     83         self.assertIsNone(m)
     84 
     85 
     86     def test_datetime_before_1900(self):
     87         # same as before but with a date before 1900
     88         dt = datetime.datetime(1,  2, 10, 11, 41, 23)
     89         self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
     90         s = xmlrpclib.dumps((dt,))
     91 
     92         result, m = xmlrpclib.loads(s, use_builtin_types=True)
     93         (newdt,) = result
     94         self.assertEqual(newdt, dt)
     95         self.assertIs(type(newdt), datetime.datetime)
     96         self.assertIsNone(m)
     97 
     98         result, m = xmlrpclib.loads(s, use_builtin_types=False)
     99         (newdt,) = result
    100         self.assertEqual(newdt, dt)
    101         self.assertIs(type(newdt), xmlrpclib.DateTime)
    102         self.assertIsNone(m)
    103 
    104     def test_bug_1164912 (self):
    105         d = xmlrpclib.DateTime()
    106         ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
    107                                             methodresponse=True))
    108         self.assertIsInstance(new_d.value, str)
    109 
    110         # Check that the output of dumps() is still an 8-bit string
    111         s = xmlrpclib.dumps((new_d,), methodresponse=True)
    112         self.assertIsInstance(s, str)
    113 
    114     def test_newstyle_class(self):
    115         class T(object):
    116             pass
    117         t = T()
    118         t.x = 100
    119         t.y = "Hello"
    120         ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
    121         self.assertEqual(t2, t.__dict__)
    122 
    123     def test_dump_big_long(self):
    124         self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
    125 
    126     def test_dump_bad_dict(self):
    127         self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
    128 
    129     def test_dump_recursive_seq(self):
    130         l = [1,2,3]
    131         t = [3,4,5,l]
    132         l.append(t)
    133         self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
    134 
    135     def test_dump_recursive_dict(self):
    136         d = {'1':1, '2':1}
    137         t = {'3':3, 'd':d}
    138         d['t'] = t
    139         self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
    140 
    141     def test_dump_big_int(self):
    142         if sys.maxsize > 2**31-1:
    143             self.assertRaises(OverflowError, xmlrpclib.dumps,
    144                               (int(2**34),))
    145 
    146         xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
    147         self.assertRaises(OverflowError, xmlrpclib.dumps,
    148                           (xmlrpclib.MAXINT+1,))
    149         self.assertRaises(OverflowError, xmlrpclib.dumps,
    150                           (xmlrpclib.MININT-1,))
    151 
    152         def dummy_write(s):
    153             pass
    154 
    155         m = xmlrpclib.Marshaller()
    156         m.dump_int(xmlrpclib.MAXINT, dummy_write)
    157         m.dump_int(xmlrpclib.MININT, dummy_write)
    158         self.assertRaises(OverflowError, m.dump_int,
    159                           xmlrpclib.MAXINT+1, dummy_write)
    160         self.assertRaises(OverflowError, m.dump_int,
    161                           xmlrpclib.MININT-1, dummy_write)
    162 
    163     def test_dump_double(self):
    164         xmlrpclib.dumps((float(2 ** 34),))
    165         xmlrpclib.dumps((float(xmlrpclib.MAXINT),
    166                          float(xmlrpclib.MININT)))
    167         xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
    168                          float(xmlrpclib.MININT - 42)))
    169 
    170         def dummy_write(s):
    171             pass
    172 
    173         m = xmlrpclib.Marshaller()
    174         m.dump_double(xmlrpclib.MAXINT, dummy_write)
    175         m.dump_double(xmlrpclib.MININT, dummy_write)
    176         m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
    177         m.dump_double(xmlrpclib.MININT - 42, dummy_write)
    178 
    179     def test_dump_none(self):
    180         value = alist + [None]
    181         arg1 = (alist + [None],)
    182         strg = xmlrpclib.dumps(arg1, allow_none=True)
    183         self.assertEqual(value,
    184                           xmlrpclib.loads(strg)[0][0])
    185         self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
    186 
    187     def test_dump_encoding(self):
    188         value = {'key\u20ac\xa4':
    189                  'value\u20ac\xa4'}
    190         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
    191         strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
    192         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    193         strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
    194         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    195 
    196         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
    197                                methodresponse=True)
    198         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    199         strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
    200         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    201 
    202         methodname = 'method\u20ac\xa4'
    203         strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
    204                                methodname=methodname)
    205         self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
    206         self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
    207 
    208     def test_dump_bytes(self):
    209         sample = b"my dog has fleas"
    210         self.assertEqual(sample, xmlrpclib.Binary(sample))
    211         for type_ in bytes, bytearray, xmlrpclib.Binary:
    212             value = type_(sample)
    213             s = xmlrpclib.dumps((value,))
    214 
    215             result, m = xmlrpclib.loads(s, use_builtin_types=True)
    216             (newvalue,) = result
    217             self.assertEqual(newvalue, sample)
    218             self.assertIs(type(newvalue), bytes)
    219             self.assertIsNone(m)
    220 
    221             result, m = xmlrpclib.loads(s, use_builtin_types=False)
    222             (newvalue,) = result
    223             self.assertEqual(newvalue, sample)
    224             self.assertIs(type(newvalue), xmlrpclib.Binary)
    225             self.assertIsNone(m)
    226 
    227     def test_loads_unsupported(self):
    228         ResponseError = xmlrpclib.ResponseError
    229         data = '<params><param><value><spam/></value></param></params>'
    230         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    231         data = ('<params><param><value><array>'
    232                 '<value><spam/></value>'
    233                 '</array></value></param></params>')
    234         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    235         data = ('<params><param><value><struct>'
    236                 '<member><name>a</name><value><spam/></value></member>'
    237                 '<member><name>b</name><value><spam/></value></member>'
    238                 '</struct></value></param></params>')
    239         self.assertRaises(ResponseError, xmlrpclib.loads, data)
    240 
    241     def check_loads(self, s, value, **kwargs):
    242         dump = '<params><param><value>%s</value></param></params>' % s
    243         result, m = xmlrpclib.loads(dump, **kwargs)
    244         (newvalue,) = result
    245         self.assertEqual(newvalue, value)
    246         self.assertIs(type(newvalue), type(value))
    247         self.assertIsNone(m)
    248 
    249     def test_load_standard_types(self):
    250         check = self.check_loads
    251         check('string', 'string')
    252         check('<string>string</string>', 'string')
    253         check('<string> string</string>', ' string')
    254         check('<int>2056183947</int>', 2056183947)
    255         check('<int>-2056183947</int>', -2056183947)
    256         check('<i4>2056183947</i4>', 2056183947)
    257         check('<double>46093.78125</double>', 46093.78125)
    258         check('<boolean>0</boolean>', False)
    259         check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
    260               xmlrpclib.Binary(b'\x00byte string\xff'))
    261         check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
    262               b'\x00byte string\xff', use_builtin_types=True)
    263         check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
    264               xmlrpclib.DateTime('20050210T11:41:23'))
    265         check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
    266               datetime.datetime(2005, 2, 10, 11, 41, 23),
    267               use_builtin_types=True)
    268         check('<array><data>'
    269               '<value><int>1</int></value><value><int>2</int></value>'
    270               '</data></array>', [1, 2])
    271         check('<struct>'
    272               '<member><name>b</name><value><int>2</int></value></member>'
    273               '<member><name>a</name><value><int>1</int></value></member>'
    274               '</struct>', {'a': 1, 'b': 2})
    275 
    276     def test_load_extension_types(self):
    277         check = self.check_loads
    278         check('<nil/>', None)
    279         check('<ex:nil/>', None)
    280         check('<i1>205</i1>', 205)
    281         check('<i2>20561</i2>', 20561)
    282         check('<i8>9876543210</i8>', 9876543210)
    283         check('<biginteger>98765432100123456789</biginteger>',
    284               98765432100123456789)
    285         check('<float>93.78125</float>', 93.78125)
    286         check('<bigdecimal>9876543210.0123456789</bigdecimal>',
    287               decimal.Decimal('9876543210.0123456789'))
    288 
    289     def test_get_host_info(self):
    290         # see bug #3613, this raised a TypeError
    291         transp = xmlrpc.client.Transport()
    292         self.assertEqual(transp.get_host_info("user (at] host.tld"),
    293                           ('host.tld',
    294                            [('Authorization', 'Basic dXNlcg==')], {}))
    295 
    296     def test_ssl_presence(self):
    297         try:
    298             import ssl
    299         except ImportError:
    300             has_ssl = False
    301         else:
    302             has_ssl = True
    303         try:
    304             xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
    305         except NotImplementedError:
    306             self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
    307         except OSError:
    308             self.assertTrue(has_ssl)
    309 
    310     @unittest.skipUnless(threading, "Threading required for this test.")
    311     def test_keepalive_disconnect(self):
    312         class RequestHandler(http.server.BaseHTTPRequestHandler):
    313             protocol_version = "HTTP/1.1"
    314             handled = False
    315 
    316             def do_POST(self):
    317                 length = int(self.headers.get("Content-Length"))
    318                 self.rfile.read(length)
    319                 if self.handled:
    320                     self.close_connection = True
    321                     return
    322                 response = xmlrpclib.dumps((5,), methodresponse=True)
    323                 response = response.encode()
    324                 self.send_response(http.HTTPStatus.OK)
    325                 self.send_header("Content-Length", len(response))
    326                 self.end_headers()
    327                 self.wfile.write(response)
    328                 self.handled = True
    329                 self.close_connection = False
    330 
    331         def run_server():
    332             server.socket.settimeout(float(1))  # Don't hang if client fails
    333             server.handle_request()  # First request and attempt at second
    334             server.handle_request()  # Retried second request
    335 
    336         server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
    337         self.addCleanup(server.server_close)
    338         thread = threading.Thread(target=run_server)
    339         thread.start()
    340         self.addCleanup(thread.join)
    341         url = "http://{}:{}/".format(*server.server_address)
    342         with xmlrpclib.ServerProxy(url) as p:
    343             self.assertEqual(p.method(), 5)
    344             self.assertEqual(p.method(), 5)
    345 
    346 class HelperTestCase(unittest.TestCase):
    347     def test_escape(self):
    348         self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
    349         self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
    350         self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
    351 
    352 class FaultTestCase(unittest.TestCase):
    353     def test_repr(self):
    354         f = xmlrpclib.Fault(42, 'Test Fault')
    355         self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
    356         self.assertEqual(repr(f), str(f))
    357 
    358     def test_dump_fault(self):
    359         f = xmlrpclib.Fault(42, 'Test Fault')
    360         s = xmlrpclib.dumps((f,))
    361         (newf,), m = xmlrpclib.loads(s)
    362         self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
    363         self.assertEqual(m, None)
    364 
    365         s = xmlrpclib.Marshaller().dumps(f)
    366         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
    367 
    368     def test_dotted_attribute(self):
    369         # this will raise AttributeError because code don't want us to use
    370         # private methods
    371         self.assertRaises(AttributeError,
    372                           xmlrpc.server.resolve_dotted_attribute, str, '__add')
    373         self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
    374 
    375 class DateTimeTestCase(unittest.TestCase):
    376     def test_default(self):
    377         with mock.patch('time.localtime') as localtime_mock:
    378             time_struct = time.struct_time(
    379                 [2013, 7, 15, 0, 24, 49, 0, 196, 0])
    380             localtime_mock.return_value = time_struct
    381             localtime = time.localtime()
    382             t = xmlrpclib.DateTime()
    383             self.assertEqual(str(t),
    384                              time.strftime("%Y%m%dT%H:%M:%S", localtime))
    385 
    386     def test_time(self):
    387         d = 1181399930.036952
    388         t = xmlrpclib.DateTime(d)
    389         self.assertEqual(str(t),
    390                          time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
    391 
    392     def test_time_tuple(self):
    393         d = (2007,6,9,10,38,50,5,160,0)
    394         t = xmlrpclib.DateTime(d)
    395         self.assertEqual(str(t), '20070609T10:38:50')
    396 
    397     def test_time_struct(self):
    398         d = time.localtime(1181399930.036952)
    399         t = xmlrpclib.DateTime(d)
    400         self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
    401 
    402     def test_datetime_datetime(self):
    403         d = datetime.datetime(2007,1,2,3,4,5)
    404         t = xmlrpclib.DateTime(d)
    405         self.assertEqual(str(t), '20070102T03:04:05')
    406 
    407     def test_repr(self):
    408         d = datetime.datetime(2007,1,2,3,4,5)
    409         t = xmlrpclib.DateTime(d)
    410         val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
    411         self.assertEqual(repr(t), val)
    412 
    413     def test_decode(self):
    414         d = ' 20070908T07:11:13  '
    415         t1 = xmlrpclib.DateTime()
    416         t1.decode(d)
    417         tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
    418         self.assertEqual(t1, tref)
    419 
    420         t2 = xmlrpclib._datetime(d)
    421         self.assertEqual(t2, tref)
    422 
    423     def test_comparison(self):
    424         now = datetime.datetime.now()
    425         dtime = xmlrpclib.DateTime(now.timetuple())
    426 
    427         # datetime vs. DateTime
    428         self.assertTrue(dtime == now)
    429         self.assertTrue(now == dtime)
    430         then = now + datetime.timedelta(seconds=4)
    431         self.assertTrue(then >= dtime)
    432         self.assertTrue(dtime < then)
    433 
    434         # str vs. DateTime
    435         dstr = now.strftime("%Y%m%dT%H:%M:%S")
    436         self.assertTrue(dtime == dstr)
    437         self.assertTrue(dstr == dtime)
    438         dtime_then = xmlrpclib.DateTime(then.timetuple())
    439         self.assertTrue(dtime_then >= dstr)
    440         self.assertTrue(dstr < dtime_then)
    441 
    442         # some other types
    443         dbytes = dstr.encode('ascii')
    444         dtuple = now.timetuple()
    445         with self.assertRaises(TypeError):
    446             dtime == 1970
    447         with self.assertRaises(TypeError):
    448             dtime != dbytes
    449         with self.assertRaises(TypeError):
    450             dtime == bytearray(dbytes)
    451         with self.assertRaises(TypeError):
    452             dtime != dtuple
    453         with self.assertRaises(TypeError):
    454             dtime < float(1970)
    455         with self.assertRaises(TypeError):
    456             dtime > dbytes
    457         with self.assertRaises(TypeError):
    458             dtime <= bytearray(dbytes)
    459         with self.assertRaises(TypeError):
    460             dtime >= dtuple
    461 
    462 class BinaryTestCase(unittest.TestCase):
    463 
    464     # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
    465     # for now (i.e. interpreting the binary data as Latin-1-encoded
    466     # text).  But this feels very unsatisfactory.  Perhaps we should
    467     # only define repr(), and return r"Binary(b'\xff')" instead?
    468 
    469     def test_default(self):
    470         t = xmlrpclib.Binary()
    471         self.assertEqual(str(t), '')
    472 
    473     def test_string(self):
    474         d = b'\x01\x02\x03abc123\xff\xfe'
    475         t = xmlrpclib.Binary(d)
    476         self.assertEqual(str(t), str(d, "latin-1"))
    477 
    478     def test_decode(self):
    479         d = b'\x01\x02\x03abc123\xff\xfe'
    480         de = base64.encodebytes(d)
    481         t1 = xmlrpclib.Binary()
    482         t1.decode(de)
    483         self.assertEqual(str(t1), str(d, "latin-1"))
    484 
    485         t2 = xmlrpclib._binary(de)
    486         self.assertEqual(str(t2), str(d, "latin-1"))
    487 
    488 
    489 ADDR = PORT = URL = None
    490 
    491 # The evt is set twice.  First when the server is ready to serve.
    492 # Second when the server has been shutdown.  The user must clear
    493 # the event after it has been set the first time to catch the second set.
    494 def http_server(evt, numrequests, requestHandler=None, encoding=None):
    495     class TestInstanceClass:
    496         def div(self, x, y):
    497             return x // y
    498 
    499         def _methodHelp(self, name):
    500             if name == 'div':
    501                 return 'This is the div function'
    502 
    503         class Fixture:
    504             @staticmethod
    505             def getData():
    506                 return '42'
    507 
    508     def my_function():
    509         '''This is my function'''
    510         return True
    511 
    512     class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
    513         def get_request(self):
    514             # Ensure the socket is always non-blocking.  On Linux, socket
    515             # attributes are not inherited like they are on *BSD and Windows.
    516             s, port = self.socket.accept()
    517             s.setblocking(True)
    518             return s, port
    519 
    520     if not requestHandler:
    521         requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
    522     serv = MyXMLRPCServer(("localhost", 0), requestHandler,
    523                           encoding=encoding,
    524                           logRequests=False, bind_and_activate=False)
    525     try:
    526         serv.server_bind()
    527         global ADDR, PORT, URL
    528         ADDR, PORT = serv.socket.getsockname()
    529         #connect to IP address directly.  This avoids socket.create_connection()
    530         #trying to connect to "localhost" using all address families, which
    531         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
    532         #on AF_INET only.
    533         URL = "http://%s:%d"%(ADDR, PORT)
    534         serv.server_activate()
    535         serv.register_introspection_functions()
    536         serv.register_multicall_functions()
    537         serv.register_function(pow)
    538         serv.register_function(lambda x,y: x+y, 'add')
    539         serv.register_function(lambda x: x, 'tt')
    540         serv.register_function(my_function)
    541         testInstance = TestInstanceClass()
    542         serv.register_instance(testInstance, allow_dotted_names=True)
    543         evt.set()
    544 
    545         # handle up to 'numrequests' requests
    546         while numrequests > 0:
    547             serv.handle_request()
    548             numrequests -= 1
    549 
    550     except socket.timeout:
    551         pass
    552     finally:
    553         serv.socket.close()
    554         PORT = None
    555         evt.set()
    556 
    557 def http_multi_server(evt, numrequests, requestHandler=None):
    558     class TestInstanceClass:
    559         def div(self, x, y):
    560             return x // y
    561 
    562         def _methodHelp(self, name):
    563             if name == 'div':
    564                 return 'This is the div function'
    565 
    566     def my_function():
    567         '''This is my function'''
    568         return True
    569 
    570     class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
    571         def get_request(self):
    572             # Ensure the socket is always non-blocking.  On Linux, socket
    573             # attributes are not inherited like they are on *BSD and Windows.
    574             s, port = self.socket.accept()
    575             s.setblocking(True)
    576             return s, port
    577 
    578     if not requestHandler:
    579         requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
    580     class MyRequestHandler(requestHandler):
    581         rpc_paths = []
    582 
    583     class BrokenDispatcher:
    584         def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
    585             raise RuntimeError("broken dispatcher")
    586 
    587     serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
    588                           logRequests=False, bind_and_activate=False)
    589     serv.socket.settimeout(3)
    590     serv.server_bind()
    591     try:
    592         global ADDR, PORT, URL
    593         ADDR, PORT = serv.socket.getsockname()
    594         #connect to IP address directly.  This avoids socket.create_connection()
    595         #trying to connect to "localhost" using all address families, which
    596         #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
    597         #on AF_INET only.
    598         URL = "http://%s:%d"%(ADDR, PORT)
    599         serv.server_activate()
    600         paths = ["/foo", "/foo/bar"]
    601         for path in paths:
    602             d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
    603             d.register_introspection_functions()
    604             d.register_multicall_functions()
    605         serv.get_dispatcher(paths[0]).register_function(pow)
    606         serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
    607         serv.add_dispatcher("/is/broken", BrokenDispatcher())
    608         evt.set()
    609 
    610         # handle up to 'numrequests' requests
    611         while numrequests > 0:
    612             serv.handle_request()
    613             numrequests -= 1
    614 
    615     except socket.timeout:
    616         pass
    617     finally:
    618         serv.socket.close()
    619         PORT = None
    620         evt.set()
    621 
    622 # This function prevents errors like:
    623 #    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
    624 def is_unavailable_exception(e):
    625     '''Returns True if the given ProtocolError is the product of a server-side
    626        exception caused by the 'temporarily unavailable' response sometimes
    627        given by operations on non-blocking sockets.'''
    628 
    629     # sometimes we get a -1 error code and/or empty headers
    630     try:
    631         if e.errcode == -1 or e.headers is None:
    632             return True
    633         exc_mess = e.headers.get('X-exception')
    634     except AttributeError:
    635         # Ignore OSErrors here.
    636         exc_mess = str(e)
    637 
    638     if exc_mess and 'temporarily unavailable' in exc_mess.lower():
    639         return True
    640 
    641 def make_request_and_skipIf(condition, reason):
    642     # If we skip the test, we have to make a request because
    643     # the server created in setUp blocks expecting one to come in.
    644     if not condition:
    645         return lambda func: func
    646     def decorator(func):
    647         def make_request_and_skip(self):
    648             try:
    649                 xmlrpclib.ServerProxy(URL).my_function()
    650             except (xmlrpclib.ProtocolError, OSError) as e:
    651                 if not is_unavailable_exception(e):
    652                     raise
    653             raise unittest.SkipTest(reason)
    654         return make_request_and_skip
    655     return decorator
    656 
    657 @unittest.skipUnless(threading, 'Threading required for this test.')
    658 class BaseServerTestCase(unittest.TestCase):
    659     requestHandler = None
    660     request_count = 1
    661     threadFunc = staticmethod(http_server)
    662 
    663     def setUp(self):
    664         # enable traceback reporting
    665         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
    666 
    667         self.evt = threading.Event()
    668         # start server thread to handle requests
    669         serv_args = (self.evt, self.request_count, self.requestHandler)
    670         threading.Thread(target=self.threadFunc, args=serv_args).start()
    671 
    672         # wait for the server to be ready
    673         self.evt.wait()
    674         self.evt.clear()
    675 
    676     def tearDown(self):
    677         # wait on the server thread to terminate
    678         self.evt.wait()
    679 
    680         # disable traceback reporting
    681         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
    682 
    683 class SimpleServerTestCase(BaseServerTestCase):
    684     def test_simple1(self):
    685         try:
    686             p = xmlrpclib.ServerProxy(URL)
    687             self.assertEqual(p.pow(6,8), 6**8)
    688         except (xmlrpclib.ProtocolError, OSError) as e:
    689             # ignore failures due to non-blocking socket 'unavailable' errors
    690             if not is_unavailable_exception(e):
    691                 # protocol error; provide additional information in test output
    692                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    693 
    694     def test_nonascii(self):
    695         start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
    696         end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
    697         try:
    698             p = xmlrpclib.ServerProxy(URL)
    699             self.assertEqual(p.add(start_string, end_string),
    700                              start_string + end_string)
    701         except (xmlrpclib.ProtocolError, OSError) as e:
    702             # ignore failures due to non-blocking socket 'unavailable' errors
    703             if not is_unavailable_exception(e):
    704                 # protocol error; provide additional information in test output
    705                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    706 
    707     def test_client_encoding(self):
    708         start_string = '\u20ac'
    709         end_string = '\xa4'
    710 
    711         try:
    712             p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
    713             self.assertEqual(p.add(start_string, end_string),
    714                              start_string + end_string)
    715         except (xmlrpclib.ProtocolError, socket.error) as e:
    716             # ignore failures due to non-blocking socket unavailable errors.
    717             if not is_unavailable_exception(e):
    718                 # protocol error; provide additional information in test output
    719                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    720 
    721     def test_nonascii_methodname(self):
    722         try:
    723             p = xmlrpclib.ServerProxy(URL, encoding='ascii')
    724             self.assertEqual(p.tt(42), 42)
    725         except (xmlrpclib.ProtocolError, socket.error) as e:
    726             # ignore failures due to non-blocking socket unavailable errors.
    727             if not is_unavailable_exception(e):
    728                 # protocol error; provide additional information in test output
    729                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    730 
    731     # [ch] The test 404 is causing lots of false alarms.
    732     def XXXtest_404(self):
    733         # send POST with http.client, it should return 404 header and
    734         # 'Not Found' message.
    735         conn = httplib.client.HTTPConnection(ADDR, PORT)
    736         conn.request('POST', '/this-is-not-valid')
    737         response = conn.getresponse()
    738         conn.close()
    739 
    740         self.assertEqual(response.status, 404)
    741         self.assertEqual(response.reason, 'Not Found')
    742 
    743     def test_introspection1(self):
    744         expected_methods = set(['pow', 'div', 'my_function', 'add', 'tt',
    745                                 'system.listMethods', 'system.methodHelp',
    746                                 'system.methodSignature', 'system.multicall',
    747                                 'Fixture'])
    748         try:
    749             p = xmlrpclib.ServerProxy(URL)
    750             meth = p.system.listMethods()
    751             self.assertEqual(set(meth), expected_methods)
    752         except (xmlrpclib.ProtocolError, OSError) as e:
    753             # ignore failures due to non-blocking socket 'unavailable' errors
    754             if not is_unavailable_exception(e):
    755                 # protocol error; provide additional information in test output
    756                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    757 
    758 
    759     def test_introspection2(self):
    760         try:
    761             # test _methodHelp()
    762             p = xmlrpclib.ServerProxy(URL)
    763             divhelp = p.system.methodHelp('div')
    764             self.assertEqual(divhelp, 'This is the div function')
    765         except (xmlrpclib.ProtocolError, OSError) as e:
    766             # ignore failures due to non-blocking socket 'unavailable' errors
    767             if not is_unavailable_exception(e):
    768                 # protocol error; provide additional information in test output
    769                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    770 
    771     @make_request_and_skipIf(sys.flags.optimize >= 2,
    772                      "Docstrings are omitted with -O2 and above")
    773     def test_introspection3(self):
    774         try:
    775             # test native doc
    776             p = xmlrpclib.ServerProxy(URL)
    777             myfunction = p.system.methodHelp('my_function')
    778             self.assertEqual(myfunction, 'This is my function')
    779         except (xmlrpclib.ProtocolError, OSError) as e:
    780             # ignore failures due to non-blocking socket 'unavailable' errors
    781             if not is_unavailable_exception(e):
    782                 # protocol error; provide additional information in test output
    783                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    784 
    785     def test_introspection4(self):
    786         # the SimpleXMLRPCServer doesn't support signatures, but
    787         # at least check that we can try making the call
    788         try:
    789             p = xmlrpclib.ServerProxy(URL)
    790             divsig = p.system.methodSignature('div')
    791             self.assertEqual(divsig, 'signatures not supported')
    792         except (xmlrpclib.ProtocolError, OSError) as e:
    793             # ignore failures due to non-blocking socket 'unavailable' errors
    794             if not is_unavailable_exception(e):
    795                 # protocol error; provide additional information in test output
    796                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    797 
    798     def test_multicall(self):
    799         try:
    800             p = xmlrpclib.ServerProxy(URL)
    801             multicall = xmlrpclib.MultiCall(p)
    802             multicall.add(2,3)
    803             multicall.pow(6,8)
    804             multicall.div(127,42)
    805             add_result, pow_result, div_result = multicall()
    806             self.assertEqual(add_result, 2+3)
    807             self.assertEqual(pow_result, 6**8)
    808             self.assertEqual(div_result, 127//42)
    809         except (xmlrpclib.ProtocolError, OSError) as e:
    810             # ignore failures due to non-blocking socket 'unavailable' errors
    811             if not is_unavailable_exception(e):
    812                 # protocol error; provide additional information in test output
    813                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    814 
    815     def test_non_existing_multicall(self):
    816         try:
    817             p = xmlrpclib.ServerProxy(URL)
    818             multicall = xmlrpclib.MultiCall(p)
    819             multicall.this_is_not_exists()
    820             result = multicall()
    821 
    822             # result.results contains;
    823             # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
    824             #   'method "this_is_not_exists" is not supported'>}]
    825 
    826             self.assertEqual(result.results[0]['faultCode'], 1)
    827             self.assertEqual(result.results[0]['faultString'],
    828                 '<class \'Exception\'>:method "this_is_not_exists" '
    829                 'is not supported')
    830         except (xmlrpclib.ProtocolError, OSError) as e:
    831             # ignore failures due to non-blocking socket 'unavailable' errors
    832             if not is_unavailable_exception(e):
    833                 # protocol error; provide additional information in test output
    834                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    835 
    836     def test_dotted_attribute(self):
    837         # Raises an AttributeError because private methods are not allowed.
    838         self.assertRaises(AttributeError,
    839                           xmlrpc.server.resolve_dotted_attribute, str, '__add')
    840 
    841         self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
    842         # Get the test to run faster by sending a request with test_simple1.
    843         # This avoids waiting for the socket timeout.
    844         self.test_simple1()
    845 
    846     def test_allow_dotted_names_true(self):
    847         # XXX also need allow_dotted_names_false test.
    848         server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
    849         data = server.Fixture.getData()
    850         self.assertEqual(data, '42')
    851 
    852     def test_unicode_host(self):
    853         server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
    854         self.assertEqual(server.add("a", "\xe9"), "a\xe9")
    855 
    856     def test_partial_post(self):
    857         # Check that a partial POST doesn't make the server loop: issue #14001.
    858         conn = http.client.HTTPConnection(ADDR, PORT)
    859         conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
    860         conn.close()
    861 
    862     def test_context_manager(self):
    863         with xmlrpclib.ServerProxy(URL) as server:
    864             server.add(2, 3)
    865             self.assertNotEqual(server('transport')._connection,
    866                                 (None, None))
    867         self.assertEqual(server('transport')._connection,
    868                          (None, None))
    869 
    870     def test_context_manager_method_error(self):
    871         try:
    872             with xmlrpclib.ServerProxy(URL) as server:
    873                 server.add(2, "a")
    874         except xmlrpclib.Fault:
    875             pass
    876         self.assertEqual(server('transport')._connection,
    877                          (None, None))
    878 
    879 
    880 class SimpleServerEncodingTestCase(BaseServerTestCase):
    881     @staticmethod
    882     def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
    883         http_server(evt, numrequests, requestHandler, 'iso-8859-15')
    884 
    885     def test_server_encoding(self):
    886         start_string = '\u20ac'
    887         end_string = '\xa4'
    888 
    889         try:
    890             p = xmlrpclib.ServerProxy(URL)
    891             self.assertEqual(p.add(start_string, end_string),
    892                              start_string + end_string)
    893         except (xmlrpclib.ProtocolError, socket.error) as e:
    894             # ignore failures due to non-blocking socket unavailable errors.
    895             if not is_unavailable_exception(e):
    896                 # protocol error; provide additional information in test output
    897                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    898 
    899 
    900 class MultiPathServerTestCase(BaseServerTestCase):
    901     threadFunc = staticmethod(http_multi_server)
    902     request_count = 2
    903     def test_path1(self):
    904         p = xmlrpclib.ServerProxy(URL+"/foo")
    905         self.assertEqual(p.pow(6,8), 6**8)
    906         self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    907 
    908     def test_path2(self):
    909         p = xmlrpclib.ServerProxy(URL+"/foo/bar")
    910         self.assertEqual(p.add(6,8), 6+8)
    911         self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
    912 
    913     def test_path3(self):
    914         p = xmlrpclib.ServerProxy(URL+"/is/broken")
    915         self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    916 
    917 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    918 #does indeed serve subsequent requests on the same connection
    919 class BaseKeepaliveServerTestCase(BaseServerTestCase):
    920     #a request handler that supports keep-alive and logs requests into a
    921     #class variable
    922     class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
    923         parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
    924         protocol_version = 'HTTP/1.1'
    925         myRequests = []
    926         def handle(self):
    927             self.myRequests.append([])
    928             self.reqidx = len(self.myRequests)-1
    929             return self.parentClass.handle(self)
    930         def handle_one_request(self):
    931             result = self.parentClass.handle_one_request(self)
    932             self.myRequests[self.reqidx].append(self.raw_requestline)
    933             return result
    934 
    935     requestHandler = RequestHandler
    936     def setUp(self):
    937         #clear request log
    938         self.RequestHandler.myRequests = []
    939         return BaseServerTestCase.setUp(self)
    940 
    941 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    942 #does indeed serve subsequent requests on the same connection
    943 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
    944     def test_two(self):
    945         p = xmlrpclib.ServerProxy(URL)
    946         #do three requests.
    947         self.assertEqual(p.pow(6,8), 6**8)
    948         self.assertEqual(p.pow(6,8), 6**8)
    949         self.assertEqual(p.pow(6,8), 6**8)
    950         p("close")()
    951 
    952         #they should have all been handled by a single request handler
    953         self.assertEqual(len(self.RequestHandler.myRequests), 1)
    954 
    955         #check that we did at least two (the third may be pending append
    956         #due to thread scheduling)
    957         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    958 
    959 
    960 #test special attribute access on the serverproxy, through the __call__
    961 #function.
    962 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
    963     #ask for two keepalive requests to be handled.
    964     request_count=2
    965 
    966     def test_close(self):
    967         p = xmlrpclib.ServerProxy(URL)
    968         #do some requests with close.
    969         self.assertEqual(p.pow(6,8), 6**8)
    970         self.assertEqual(p.pow(6,8), 6**8)
    971         self.assertEqual(p.pow(6,8), 6**8)
    972         p("close")() #this should trigger a new keep-alive request
    973         self.assertEqual(p.pow(6,8), 6**8)
    974         self.assertEqual(p.pow(6,8), 6**8)
    975         self.assertEqual(p.pow(6,8), 6**8)
    976         p("close")()
    977 
    978         #they should have all been two request handlers, each having logged at least
    979         #two complete requests
    980         self.assertEqual(len(self.RequestHandler.myRequests), 2)
    981         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    982         self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
    983 
    984 
    985     def test_transport(self):
    986         p = xmlrpclib.ServerProxy(URL)
    987         #do some requests with close.
    988         self.assertEqual(p.pow(6,8), 6**8)
    989         p("transport").close() #same as above, really.
    990         self.assertEqual(p.pow(6,8), 6**8)
    991         p("close")()
    992         self.assertEqual(len(self.RequestHandler.myRequests), 2)
    993 
    994 #A test case that verifies that gzip encoding works in both directions
    995 #(for a request and the response)
    996 @unittest.skipIf(gzip is None, 'requires gzip')
    997 class GzipServerTestCase(BaseServerTestCase):
    998     #a request handler that supports keep-alive and logs requests into a
    999     #class variable
   1000     class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
   1001         parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
   1002         protocol_version = 'HTTP/1.1'
   1003 
   1004         def do_POST(self):
   1005             #store content of last request in class
   1006             self.__class__.content_length = int(self.headers["content-length"])
   1007             return self.parentClass.do_POST(self)
   1008     requestHandler = RequestHandler
   1009 
   1010     class Transport(xmlrpclib.Transport):
   1011         #custom transport, stores the response length for our perusal
   1012         fake_gzip = False
   1013         def parse_response(self, response):
   1014             self.response_length=int(response.getheader("content-length", 0))
   1015             return xmlrpclib.Transport.parse_response(self, response)
   1016 
   1017         def send_content(self, connection, body):
   1018             if self.fake_gzip:
   1019                 #add a lone gzip header to induce decode error remotely
   1020                 connection.putheader("Content-Encoding", "gzip")
   1021             return xmlrpclib.Transport.send_content(self, connection, body)
   1022 
   1023     def setUp(self):
   1024         BaseServerTestCase.setUp(self)
   1025 
   1026     def test_gzip_request(self):
   1027         t = self.Transport()
   1028         t.encode_threshold = None
   1029         p = xmlrpclib.ServerProxy(URL, transport=t)
   1030         self.assertEqual(p.pow(6,8), 6**8)
   1031         a = self.RequestHandler.content_length
   1032         t.encode_threshold = 0 #turn on request encoding
   1033         self.assertEqual(p.pow(6,8), 6**8)
   1034         b = self.RequestHandler.content_length
   1035         self.assertTrue(a>b)
   1036         p("close")()
   1037 
   1038     def test_bad_gzip_request(self):
   1039         t = self.Transport()
   1040         t.encode_threshold = None
   1041         t.fake_gzip = True
   1042         p = xmlrpclib.ServerProxy(URL, transport=t)
   1043         cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
   1044                                     re.compile(r"\b400\b"))
   1045         with cm:
   1046             p.pow(6, 8)
   1047         p("close")()
   1048 
   1049     def test_gzip_response(self):
   1050         t = self.Transport()
   1051         p = xmlrpclib.ServerProxy(URL, transport=t)
   1052         old = self.requestHandler.encode_threshold
   1053         self.requestHandler.encode_threshold = None #no encoding
   1054         self.assertEqual(p.pow(6,8), 6**8)
   1055         a = t.response_length
   1056         self.requestHandler.encode_threshold = 0 #always encode
   1057         self.assertEqual(p.pow(6,8), 6**8)
   1058         p("close")()
   1059         b = t.response_length
   1060         self.requestHandler.encode_threshold = old
   1061         self.assertTrue(a>b)
   1062 
   1063 
   1064 @unittest.skipIf(gzip is None, 'requires gzip')
   1065 class GzipUtilTestCase(unittest.TestCase):
   1066 
   1067     def test_gzip_decode_limit(self):
   1068         max_gzip_decode = 20 * 1024 * 1024
   1069         data = b'\0' * max_gzip_decode
   1070         encoded = xmlrpclib.gzip_encode(data)
   1071         decoded = xmlrpclib.gzip_decode(encoded)
   1072         self.assertEqual(len(decoded), max_gzip_decode)
   1073 
   1074         data = b'\0' * (max_gzip_decode + 1)
   1075         encoded = xmlrpclib.gzip_encode(data)
   1076 
   1077         with self.assertRaisesRegex(ValueError,
   1078                                     "max gzipped payload length exceeded"):
   1079             xmlrpclib.gzip_decode(encoded)
   1080 
   1081         xmlrpclib.gzip_decode(encoded, max_decode=-1)
   1082 
   1083 
   1084 #Test special attributes of the ServerProxy object
   1085 class ServerProxyTestCase(unittest.TestCase):
   1086     def setUp(self):
   1087         unittest.TestCase.setUp(self)
   1088         if threading:
   1089             self.url = URL
   1090         else:
   1091             # Without threading, http_server() and http_multi_server() will not
   1092             # be executed and URL is still equal to None. 'http://' is a just
   1093             # enough to choose the scheme (HTTP)
   1094             self.url = 'http://'
   1095 
   1096     def test_close(self):
   1097         p = xmlrpclib.ServerProxy(self.url)
   1098         self.assertEqual(p('close')(), None)
   1099 
   1100     def test_transport(self):
   1101         t = xmlrpclib.Transport()
   1102         p = xmlrpclib.ServerProxy(self.url, transport=t)
   1103         self.assertEqual(p('transport'), t)
   1104 
   1105 
   1106 # This is a contrived way to make a failure occur on the server side
   1107 # in order to test the _send_traceback_header flag on the server
   1108 class FailingMessageClass(http.client.HTTPMessage):
   1109     def get(self, key, failobj=None):
   1110         key = key.lower()
   1111         if key == 'content-length':
   1112             return 'I am broken'
   1113         return super().get(key, failobj)
   1114 
   1115 
   1116 @unittest.skipUnless(threading, 'Threading required for this test.')
   1117 class FailingServerTestCase(unittest.TestCase):
   1118     def setUp(self):
   1119         self.evt = threading.Event()
   1120         # start server thread to handle requests
   1121         serv_args = (self.evt, 1)
   1122         threading.Thread(target=http_server, args=serv_args).start()
   1123 
   1124         # wait for the server to be ready
   1125         self.evt.wait()
   1126         self.evt.clear()
   1127 
   1128     def tearDown(self):
   1129         # wait on the server thread to terminate
   1130         self.evt.wait()
   1131         # reset flag
   1132         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
   1133         # reset message class
   1134         default_class = http.client.HTTPMessage
   1135         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
   1136 
   1137     def test_basic(self):
   1138         # check that flag is false by default
   1139         flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
   1140         self.assertEqual(flagval, False)
   1141 
   1142         # enable traceback reporting
   1143         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
   1144 
   1145         # test a call that shouldn't fail just as a smoke test
   1146         try:
   1147             p = xmlrpclib.ServerProxy(URL)
   1148             self.assertEqual(p.pow(6,8), 6**8)
   1149         except (xmlrpclib.ProtocolError, OSError) as e:
   1150             # ignore failures due to non-blocking socket 'unavailable' errors
   1151             if not is_unavailable_exception(e):
   1152                 # protocol error; provide additional information in test output
   1153                 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
   1154 
   1155     def test_fail_no_info(self):
   1156         # use the broken message class
   1157         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
   1158 
   1159         try:
   1160             p = xmlrpclib.ServerProxy(URL)
   1161             p.pow(6,8)
   1162         except (xmlrpclib.ProtocolError, OSError) as e:
   1163             # ignore failures due to non-blocking socket 'unavailable' errors
   1164             if not is_unavailable_exception(e) and hasattr(e, "headers"):
   1165                 # The two server-side error headers shouldn't be sent back in this case
   1166                 self.assertTrue(e.headers.get("X-exception") is None)
   1167                 self.assertTrue(e.headers.get("X-traceback") is None)
   1168         else:
   1169             self.fail('ProtocolError not raised')
   1170 
   1171     def test_fail_with_info(self):
   1172         # use the broken message class
   1173         xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
   1174 
   1175         # Check that errors in the server send back exception/traceback
   1176         # info when flag is set
   1177         xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
   1178 
   1179         try:
   1180             p = xmlrpclib.ServerProxy(URL)
   1181             p.pow(6,8)
   1182         except (xmlrpclib.ProtocolError, OSError) as e:
   1183             # ignore failures due to non-blocking socket 'unavailable' errors
   1184             if not is_unavailable_exception(e) and hasattr(e, "headers"):
   1185                 # We should get error info in the response
   1186                 expected_err = "invalid literal for int() with base 10: 'I am broken'"
   1187                 self.assertEqual(e.headers.get("X-exception"), expected_err)
   1188                 self.assertTrue(e.headers.get("X-traceback") is not None)
   1189         else:
   1190             self.fail('ProtocolError not raised')
   1191 
   1192 
   1193 @contextlib.contextmanager
   1194 def captured_stdout(encoding='utf-8'):
   1195     """A variation on support.captured_stdout() which gives a text stream
   1196     having a `buffer` attribute.
   1197     """
   1198     orig_stdout = sys.stdout
   1199     sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
   1200     try:
   1201         yield sys.stdout
   1202     finally:
   1203         sys.stdout = orig_stdout
   1204 
   1205 
   1206 class CGIHandlerTestCase(unittest.TestCase):
   1207     def setUp(self):
   1208         self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
   1209 
   1210     def tearDown(self):
   1211         self.cgi = None
   1212 
   1213     def test_cgi_get(self):
   1214         with support.EnvironmentVarGuard() as env:
   1215             env['REQUEST_METHOD'] = 'GET'
   1216             # if the method is GET and no request_text is given, it runs handle_get
   1217             # get sysout output
   1218             with captured_stdout(encoding=self.cgi.encoding) as data_out:
   1219                 self.cgi.handle_request()
   1220 
   1221             # parse Status header
   1222             data_out.seek(0)
   1223             handle = data_out.read()
   1224             status = handle.split()[1]
   1225             message = ' '.join(handle.split()[2:4])
   1226 
   1227             self.assertEqual(status, '400')
   1228             self.assertEqual(message, 'Bad Request')
   1229 
   1230 
   1231     def test_cgi_xmlrpc_response(self):
   1232         data = """<?xml version='1.0'?>
   1233         <methodCall>
   1234             <methodName>test_method</methodName>
   1235             <params>
   1236                 <param>
   1237                     <value><string>foo</string></value>
   1238                 </param>
   1239                 <param>
   1240                     <value><string>bar</string></value>
   1241                 </param>
   1242             </params>
   1243         </methodCall>
   1244         """
   1245 
   1246         with support.EnvironmentVarGuard() as env, \
   1247              captured_stdout(encoding=self.cgi.encoding) as data_out, \
   1248              support.captured_stdin() as data_in:
   1249             data_in.write(data)
   1250             data_in.seek(0)
   1251             env['CONTENT_LENGTH'] = str(len(data))
   1252             self.cgi.handle_request()
   1253         data_out.seek(0)
   1254 
   1255         # will respond exception, if so, our goal is achieved ;)
   1256         handle = data_out.read()
   1257 
   1258         # start with 44th char so as not to get http header, we just
   1259         # need only xml
   1260         self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
   1261 
   1262         # Also test the content-length returned  by handle_request
   1263         # Using the same test method inorder to avoid all the datapassing
   1264         # boilerplate code.
   1265         # Test for bug: http://bugs.python.org/issue5040
   1266 
   1267         content = handle[handle.find("<?xml"):]
   1268 
   1269         self.assertEqual(
   1270             int(re.search(r'Content-Length: (\d+)', handle).group(1)),
   1271             len(content))
   1272 
   1273 
   1274 class UseBuiltinTypesTestCase(unittest.TestCase):
   1275 
   1276     def test_use_builtin_types(self):
   1277         # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
   1278         # makes all dispatch of binary data as bytes instances, and all
   1279         # dispatch of datetime argument as datetime.datetime instances.
   1280         self.log = []
   1281         expected_bytes = b"my dog has fleas"
   1282         expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
   1283         marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
   1284         def foobar(*args):
   1285             self.log.extend(args)
   1286         handler = xmlrpc.server.SimpleXMLRPCDispatcher(
   1287             allow_none=True, encoding=None, use_builtin_types=True)
   1288         handler.register_function(foobar)
   1289         handler._marshaled_dispatch(marshaled)
   1290         self.assertEqual(len(self.log), 2)
   1291         mybytes, mydate = self.log
   1292         self.assertEqual(self.log, [expected_bytes, expected_date])
   1293         self.assertIs(type(mydate), datetime.datetime)
   1294         self.assertIs(type(mybytes), bytes)
   1295 
   1296     def test_cgihandler_has_use_builtin_types_flag(self):
   1297         handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
   1298         self.assertTrue(handler.use_builtin_types)
   1299 
   1300     def test_xmlrpcserver_has_use_builtin_types_flag(self):
   1301         server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
   1302             use_builtin_types=True)
   1303         server.server_close()
   1304         self.assertTrue(server.use_builtin_types)
   1305 
   1306 
   1307 @support.reap_threads
   1308 def test_main():
   1309     support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
   1310             BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
   1311             SimpleServerTestCase, SimpleServerEncodingTestCase,
   1312             KeepaliveServerTestCase1, KeepaliveServerTestCase2,
   1313             GzipServerTestCase, GzipUtilTestCase,
   1314             MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
   1315             CGIHandlerTestCase)
   1316 
   1317 
   1318 if __name__ == "__main__":
   1319     test_main()
   1320