1 import base64 2 import datetime 3 import decimal 4 import sys 5 import time 6 import unittest 7 from unittest import mock 8 import xmlrpc.client as xmlrpclib 9 import xmlrpc.server 10 import http.client 11 import http, http.server 12 import socket 13 import re 14 import io 15 import contextlib 16 from test import support 17 18 try: 19 import gzip 20 except ImportError: 21 gzip = None 22 try: 23 import threading 24 except ImportError: 25 threading = None 26 27 alist = [{'astring': 'foo (at] bar.baz.spam', 28 'afloat': 7283.43, 29 'anint': 2**20, 30 'ashortlong': 2, 31 'anotherlist': ['.zyx.41'], 32 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 33 'b64bytes': b"my dog has fleas", 34 'b64bytearray': bytearray(b"my dog has fleas"), 35 'boolean': False, 36 'unicode': '\u4000\u6000\u8000', 37 'ukey\u4000': 'regular value', 38 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 39 'datetime2': xmlrpclib.DateTime( 40 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 41 'datetime3': xmlrpclib.DateTime( 42 datetime.datetime(2005, 2, 10, 11, 41, 23)), 43 }] 44 45 class XMLRPCTestCase(unittest.TestCase): 46 47 def test_dump_load(self): 48 dump = xmlrpclib.dumps((alist,)) 49 load = xmlrpclib.loads(dump) 50 self.assertEqual(alist, load[0][0]) 51 52 def test_dump_bare_datetime(self): 53 # This checks that an unwrapped datetime.date object can be handled 54 # by the marshalling code. This can't be done via test_dump_load() 55 # since with use_builtin_types set to 1 the unmarshaller would create 56 # datetime objects for the 'datetime[123]' keys as well 57 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 58 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 59 s = xmlrpclib.dumps((dt,)) 60 61 result, m = xmlrpclib.loads(s, use_builtin_types=True) 62 (newdt,) = result 63 self.assertEqual(newdt, dt) 64 self.assertIs(type(newdt), datetime.datetime) 65 self.assertIsNone(m) 66 67 result, m = xmlrpclib.loads(s, use_builtin_types=False) 68 (newdt,) = result 69 self.assertEqual(newdt, dt) 70 self.assertIs(type(newdt), xmlrpclib.DateTime) 71 self.assertIsNone(m) 72 73 result, m = xmlrpclib.loads(s, use_datetime=True) 74 (newdt,) = result 75 self.assertEqual(newdt, dt) 76 self.assertIs(type(newdt), datetime.datetime) 77 self.assertIsNone(m) 78 79 result, m = xmlrpclib.loads(s, use_datetime=False) 80 (newdt,) = result 81 self.assertEqual(newdt, dt) 82 self.assertIs(type(newdt), xmlrpclib.DateTime) 83 self.assertIsNone(m) 84 85 86 def test_datetime_before_1900(self): 87 # same as before but with a date before 1900 88 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 89 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 90 s = xmlrpclib.dumps((dt,)) 91 92 result, m = xmlrpclib.loads(s, use_builtin_types=True) 93 (newdt,) = result 94 self.assertEqual(newdt, dt) 95 self.assertIs(type(newdt), datetime.datetime) 96 self.assertIsNone(m) 97 98 result, m = xmlrpclib.loads(s, use_builtin_types=False) 99 (newdt,) = result 100 self.assertEqual(newdt, dt) 101 self.assertIs(type(newdt), xmlrpclib.DateTime) 102 self.assertIsNone(m) 103 104 def test_bug_1164912 (self): 105 d = xmlrpclib.DateTime() 106 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 107 methodresponse=True)) 108 self.assertIsInstance(new_d.value, str) 109 110 # Check that the output of dumps() is still an 8-bit string 111 s = xmlrpclib.dumps((new_d,), methodresponse=True) 112 self.assertIsInstance(s, str) 113 114 def test_newstyle_class(self): 115 class T(object): 116 pass 117 t = T() 118 t.x = 100 119 t.y = "Hello" 120 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 121 self.assertEqual(t2, t.__dict__) 122 123 def test_dump_big_long(self): 124 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 125 126 def test_dump_bad_dict(self): 127 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 128 129 def test_dump_recursive_seq(self): 130 l = [1,2,3] 131 t = [3,4,5,l] 132 l.append(t) 133 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 134 135 def test_dump_recursive_dict(self): 136 d = {'1':1, '2':1} 137 t = {'3':3, 'd':d} 138 d['t'] = t 139 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 140 141 def test_dump_big_int(self): 142 if sys.maxsize > 2**31-1: 143 self.assertRaises(OverflowError, xmlrpclib.dumps, 144 (int(2**34),)) 145 146 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 147 self.assertRaises(OverflowError, xmlrpclib.dumps, 148 (xmlrpclib.MAXINT+1,)) 149 self.assertRaises(OverflowError, xmlrpclib.dumps, 150 (xmlrpclib.MININT-1,)) 151 152 def dummy_write(s): 153 pass 154 155 m = xmlrpclib.Marshaller() 156 m.dump_int(xmlrpclib.MAXINT, dummy_write) 157 m.dump_int(xmlrpclib.MININT, dummy_write) 158 self.assertRaises(OverflowError, m.dump_int, 159 xmlrpclib.MAXINT+1, dummy_write) 160 self.assertRaises(OverflowError, m.dump_int, 161 xmlrpclib.MININT-1, dummy_write) 162 163 def test_dump_double(self): 164 xmlrpclib.dumps((float(2 ** 34),)) 165 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 166 float(xmlrpclib.MININT))) 167 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 168 float(xmlrpclib.MININT - 42))) 169 170 def dummy_write(s): 171 pass 172 173 m = xmlrpclib.Marshaller() 174 m.dump_double(xmlrpclib.MAXINT, dummy_write) 175 m.dump_double(xmlrpclib.MININT, dummy_write) 176 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 177 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 178 179 def test_dump_none(self): 180 value = alist + [None] 181 arg1 = (alist + [None],) 182 strg = xmlrpclib.dumps(arg1, allow_none=True) 183 self.assertEqual(value, 184 xmlrpclib.loads(strg)[0][0]) 185 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 186 187 def test_dump_encoding(self): 188 value = {'key\u20ac\xa4': 189 'value\u20ac\xa4'} 190 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 191 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 192 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 193 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 194 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 195 196 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 197 methodresponse=True) 198 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 199 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 200 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 201 202 methodname = 'method\u20ac\xa4' 203 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 204 methodname=methodname) 205 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 206 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 207 208 def test_dump_bytes(self): 209 sample = b"my dog has fleas" 210 self.assertEqual(sample, xmlrpclib.Binary(sample)) 211 for type_ in bytes, bytearray, xmlrpclib.Binary: 212 value = type_(sample) 213 s = xmlrpclib.dumps((value,)) 214 215 result, m = xmlrpclib.loads(s, use_builtin_types=True) 216 (newvalue,) = result 217 self.assertEqual(newvalue, sample) 218 self.assertIs(type(newvalue), bytes) 219 self.assertIsNone(m) 220 221 result, m = xmlrpclib.loads(s, use_builtin_types=False) 222 (newvalue,) = result 223 self.assertEqual(newvalue, sample) 224 self.assertIs(type(newvalue), xmlrpclib.Binary) 225 self.assertIsNone(m) 226 227 def test_loads_unsupported(self): 228 ResponseError = xmlrpclib.ResponseError 229 data = '<params><param><value><spam/></value></param></params>' 230 self.assertRaises(ResponseError, xmlrpclib.loads, data) 231 data = ('<params><param><value><array>' 232 '<value><spam/></value>' 233 '</array></value></param></params>') 234 self.assertRaises(ResponseError, xmlrpclib.loads, data) 235 data = ('<params><param><value><struct>' 236 '<member><name>a</name><value><spam/></value></member>' 237 '<member><name>b</name><value><spam/></value></member>' 238 '</struct></value></param></params>') 239 self.assertRaises(ResponseError, xmlrpclib.loads, data) 240 241 def check_loads(self, s, value, **kwargs): 242 dump = '<params><param><value>%s</value></param></params>' % s 243 result, m = xmlrpclib.loads(dump, **kwargs) 244 (newvalue,) = result 245 self.assertEqual(newvalue, value) 246 self.assertIs(type(newvalue), type(value)) 247 self.assertIsNone(m) 248 249 def test_load_standard_types(self): 250 check = self.check_loads 251 check('string', 'string') 252 check('<string>string</string>', 'string') 253 check('<string> string</string>', ' string') 254 check('<int>2056183947</int>', 2056183947) 255 check('<int>-2056183947</int>', -2056183947) 256 check('<i4>2056183947</i4>', 2056183947) 257 check('<double>46093.78125</double>', 46093.78125) 258 check('<boolean>0</boolean>', False) 259 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 260 xmlrpclib.Binary(b'\x00byte string\xff')) 261 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 262 b'\x00byte string\xff', use_builtin_types=True) 263 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 264 xmlrpclib.DateTime('20050210T11:41:23')) 265 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 266 datetime.datetime(2005, 2, 10, 11, 41, 23), 267 use_builtin_types=True) 268 check('<array><data>' 269 '<value><int>1</int></value><value><int>2</int></value>' 270 '</data></array>', [1, 2]) 271 check('<struct>' 272 '<member><name>b</name><value><int>2</int></value></member>' 273 '<member><name>a</name><value><int>1</int></value></member>' 274 '</struct>', {'a': 1, 'b': 2}) 275 276 def test_load_extension_types(self): 277 check = self.check_loads 278 check('<nil/>', None) 279 check('<ex:nil/>', None) 280 check('<i1>205</i1>', 205) 281 check('<i2>20561</i2>', 20561) 282 check('<i8>9876543210</i8>', 9876543210) 283 check('<biginteger>98765432100123456789</biginteger>', 284 98765432100123456789) 285 check('<float>93.78125</float>', 93.78125) 286 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 287 decimal.Decimal('9876543210.0123456789')) 288 289 def test_get_host_info(self): 290 # see bug #3613, this raised a TypeError 291 transp = xmlrpc.client.Transport() 292 self.assertEqual(transp.get_host_info("user (at] host.tld"), 293 ('host.tld', 294 [('Authorization', 'Basic dXNlcg==')], {})) 295 296 def test_ssl_presence(self): 297 try: 298 import ssl 299 except ImportError: 300 has_ssl = False 301 else: 302 has_ssl = True 303 try: 304 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 305 except NotImplementedError: 306 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 307 except OSError: 308 self.assertTrue(has_ssl) 309 310 @unittest.skipUnless(threading, "Threading required for this test.") 311 def test_keepalive_disconnect(self): 312 class RequestHandler(http.server.BaseHTTPRequestHandler): 313 protocol_version = "HTTP/1.1" 314 handled = False 315 316 def do_POST(self): 317 length = int(self.headers.get("Content-Length")) 318 self.rfile.read(length) 319 if self.handled: 320 self.close_connection = True 321 return 322 response = xmlrpclib.dumps((5,), methodresponse=True) 323 response = response.encode() 324 self.send_response(http.HTTPStatus.OK) 325 self.send_header("Content-Length", len(response)) 326 self.end_headers() 327 self.wfile.write(response) 328 self.handled = True 329 self.close_connection = False 330 331 def run_server(): 332 server.socket.settimeout(float(1)) # Don't hang if client fails 333 server.handle_request() # First request and attempt at second 334 server.handle_request() # Retried second request 335 336 server = http.server.HTTPServer((support.HOST, 0), RequestHandler) 337 self.addCleanup(server.server_close) 338 thread = threading.Thread(target=run_server) 339 thread.start() 340 self.addCleanup(thread.join) 341 url = "http://{}:{}/".format(*server.server_address) 342 with xmlrpclib.ServerProxy(url) as p: 343 self.assertEqual(p.method(), 5) 344 self.assertEqual(p.method(), 5) 345 346 class HelperTestCase(unittest.TestCase): 347 def test_escape(self): 348 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 349 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 350 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 351 352 class FaultTestCase(unittest.TestCase): 353 def test_repr(self): 354 f = xmlrpclib.Fault(42, 'Test Fault') 355 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 356 self.assertEqual(repr(f), str(f)) 357 358 def test_dump_fault(self): 359 f = xmlrpclib.Fault(42, 'Test Fault') 360 s = xmlrpclib.dumps((f,)) 361 (newf,), m = xmlrpclib.loads(s) 362 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 363 self.assertEqual(m, None) 364 365 s = xmlrpclib.Marshaller().dumps(f) 366 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 367 368 def test_dotted_attribute(self): 369 # this will raise AttributeError because code don't want us to use 370 # private methods 371 self.assertRaises(AttributeError, 372 xmlrpc.server.resolve_dotted_attribute, str, '__add') 373 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 374 375 class DateTimeTestCase(unittest.TestCase): 376 def test_default(self): 377 with mock.patch('time.localtime') as localtime_mock: 378 time_struct = time.struct_time( 379 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 380 localtime_mock.return_value = time_struct 381 localtime = time.localtime() 382 t = xmlrpclib.DateTime() 383 self.assertEqual(str(t), 384 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 385 386 def test_time(self): 387 d = 1181399930.036952 388 t = xmlrpclib.DateTime(d) 389 self.assertEqual(str(t), 390 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 391 392 def test_time_tuple(self): 393 d = (2007,6,9,10,38,50,5,160,0) 394 t = xmlrpclib.DateTime(d) 395 self.assertEqual(str(t), '20070609T10:38:50') 396 397 def test_time_struct(self): 398 d = time.localtime(1181399930.036952) 399 t = xmlrpclib.DateTime(d) 400 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 401 402 def test_datetime_datetime(self): 403 d = datetime.datetime(2007,1,2,3,4,5) 404 t = xmlrpclib.DateTime(d) 405 self.assertEqual(str(t), '20070102T03:04:05') 406 407 def test_repr(self): 408 d = datetime.datetime(2007,1,2,3,4,5) 409 t = xmlrpclib.DateTime(d) 410 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 411 self.assertEqual(repr(t), val) 412 413 def test_decode(self): 414 d = ' 20070908T07:11:13 ' 415 t1 = xmlrpclib.DateTime() 416 t1.decode(d) 417 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 418 self.assertEqual(t1, tref) 419 420 t2 = xmlrpclib._datetime(d) 421 self.assertEqual(t2, tref) 422 423 def test_comparison(self): 424 now = datetime.datetime.now() 425 dtime = xmlrpclib.DateTime(now.timetuple()) 426 427 # datetime vs. DateTime 428 self.assertTrue(dtime == now) 429 self.assertTrue(now == dtime) 430 then = now + datetime.timedelta(seconds=4) 431 self.assertTrue(then >= dtime) 432 self.assertTrue(dtime < then) 433 434 # str vs. DateTime 435 dstr = now.strftime("%Y%m%dT%H:%M:%S") 436 self.assertTrue(dtime == dstr) 437 self.assertTrue(dstr == dtime) 438 dtime_then = xmlrpclib.DateTime(then.timetuple()) 439 self.assertTrue(dtime_then >= dstr) 440 self.assertTrue(dstr < dtime_then) 441 442 # some other types 443 dbytes = dstr.encode('ascii') 444 dtuple = now.timetuple() 445 with self.assertRaises(TypeError): 446 dtime == 1970 447 with self.assertRaises(TypeError): 448 dtime != dbytes 449 with self.assertRaises(TypeError): 450 dtime == bytearray(dbytes) 451 with self.assertRaises(TypeError): 452 dtime != dtuple 453 with self.assertRaises(TypeError): 454 dtime < float(1970) 455 with self.assertRaises(TypeError): 456 dtime > dbytes 457 with self.assertRaises(TypeError): 458 dtime <= bytearray(dbytes) 459 with self.assertRaises(TypeError): 460 dtime >= dtuple 461 462 class BinaryTestCase(unittest.TestCase): 463 464 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 465 # for now (i.e. interpreting the binary data as Latin-1-encoded 466 # text). But this feels very unsatisfactory. Perhaps we should 467 # only define repr(), and return r"Binary(b'\xff')" instead? 468 469 def test_default(self): 470 t = xmlrpclib.Binary() 471 self.assertEqual(str(t), '') 472 473 def test_string(self): 474 d = b'\x01\x02\x03abc123\xff\xfe' 475 t = xmlrpclib.Binary(d) 476 self.assertEqual(str(t), str(d, "latin-1")) 477 478 def test_decode(self): 479 d = b'\x01\x02\x03abc123\xff\xfe' 480 de = base64.encodebytes(d) 481 t1 = xmlrpclib.Binary() 482 t1.decode(de) 483 self.assertEqual(str(t1), str(d, "latin-1")) 484 485 t2 = xmlrpclib._binary(de) 486 self.assertEqual(str(t2), str(d, "latin-1")) 487 488 489 ADDR = PORT = URL = None 490 491 # The evt is set twice. First when the server is ready to serve. 492 # Second when the server has been shutdown. The user must clear 493 # the event after it has been set the first time to catch the second set. 494 def http_server(evt, numrequests, requestHandler=None, encoding=None): 495 class TestInstanceClass: 496 def div(self, x, y): 497 return x // y 498 499 def _methodHelp(self, name): 500 if name == 'div': 501 return 'This is the div function' 502 503 class Fixture: 504 @staticmethod 505 def getData(): 506 return '42' 507 508 def my_function(): 509 '''This is my function''' 510 return True 511 512 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 513 def get_request(self): 514 # Ensure the socket is always non-blocking. On Linux, socket 515 # attributes are not inherited like they are on *BSD and Windows. 516 s, port = self.socket.accept() 517 s.setblocking(True) 518 return s, port 519 520 if not requestHandler: 521 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 522 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 523 encoding=encoding, 524 logRequests=False, bind_and_activate=False) 525 try: 526 serv.server_bind() 527 global ADDR, PORT, URL 528 ADDR, PORT = serv.socket.getsockname() 529 #connect to IP address directly. This avoids socket.create_connection() 530 #trying to connect to "localhost" using all address families, which 531 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 532 #on AF_INET only. 533 URL = "http://%s:%d"%(ADDR, PORT) 534 serv.server_activate() 535 serv.register_introspection_functions() 536 serv.register_multicall_functions() 537 serv.register_function(pow) 538 serv.register_function(lambda x,y: x+y, 'add') 539 serv.register_function(lambda x: x, 'tt') 540 serv.register_function(my_function) 541 testInstance = TestInstanceClass() 542 serv.register_instance(testInstance, allow_dotted_names=True) 543 evt.set() 544 545 # handle up to 'numrequests' requests 546 while numrequests > 0: 547 serv.handle_request() 548 numrequests -= 1 549 550 except socket.timeout: 551 pass 552 finally: 553 serv.socket.close() 554 PORT = None 555 evt.set() 556 557 def http_multi_server(evt, numrequests, requestHandler=None): 558 class TestInstanceClass: 559 def div(self, x, y): 560 return x // y 561 562 def _methodHelp(self, name): 563 if name == 'div': 564 return 'This is the div function' 565 566 def my_function(): 567 '''This is my function''' 568 return True 569 570 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 571 def get_request(self): 572 # Ensure the socket is always non-blocking. On Linux, socket 573 # attributes are not inherited like they are on *BSD and Windows. 574 s, port = self.socket.accept() 575 s.setblocking(True) 576 return s, port 577 578 if not requestHandler: 579 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 580 class MyRequestHandler(requestHandler): 581 rpc_paths = [] 582 583 class BrokenDispatcher: 584 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 585 raise RuntimeError("broken dispatcher") 586 587 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 588 logRequests=False, bind_and_activate=False) 589 serv.socket.settimeout(3) 590 serv.server_bind() 591 try: 592 global ADDR, PORT, URL 593 ADDR, PORT = serv.socket.getsockname() 594 #connect to IP address directly. This avoids socket.create_connection() 595 #trying to connect to "localhost" using all address families, which 596 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 597 #on AF_INET only. 598 URL = "http://%s:%d"%(ADDR, PORT) 599 serv.server_activate() 600 paths = ["/foo", "/foo/bar"] 601 for path in paths: 602 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 603 d.register_introspection_functions() 604 d.register_multicall_functions() 605 serv.get_dispatcher(paths[0]).register_function(pow) 606 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 607 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 608 evt.set() 609 610 # handle up to 'numrequests' requests 611 while numrequests > 0: 612 serv.handle_request() 613 numrequests -= 1 614 615 except socket.timeout: 616 pass 617 finally: 618 serv.socket.close() 619 PORT = None 620 evt.set() 621 622 # This function prevents errors like: 623 # <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 624 def is_unavailable_exception(e): 625 '''Returns True if the given ProtocolError is the product of a server-side 626 exception caused by the 'temporarily unavailable' response sometimes 627 given by operations on non-blocking sockets.''' 628 629 # sometimes we get a -1 error code and/or empty headers 630 try: 631 if e.errcode == -1 or e.headers is None: 632 return True 633 exc_mess = e.headers.get('X-exception') 634 except AttributeError: 635 # Ignore OSErrors here. 636 exc_mess = str(e) 637 638 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 639 return True 640 641 def make_request_and_skipIf(condition, reason): 642 # If we skip the test, we have to make a request because 643 # the server created in setUp blocks expecting one to come in. 644 if not condition: 645 return lambda func: func 646 def decorator(func): 647 def make_request_and_skip(self): 648 try: 649 xmlrpclib.ServerProxy(URL).my_function() 650 except (xmlrpclib.ProtocolError, OSError) as e: 651 if not is_unavailable_exception(e): 652 raise 653 raise unittest.SkipTest(reason) 654 return make_request_and_skip 655 return decorator 656 657 @unittest.skipUnless(threading, 'Threading required for this test.') 658 class BaseServerTestCase(unittest.TestCase): 659 requestHandler = None 660 request_count = 1 661 threadFunc = staticmethod(http_server) 662 663 def setUp(self): 664 # enable traceback reporting 665 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 666 667 self.evt = threading.Event() 668 # start server thread to handle requests 669 serv_args = (self.evt, self.request_count, self.requestHandler) 670 threading.Thread(target=self.threadFunc, args=serv_args).start() 671 672 # wait for the server to be ready 673 self.evt.wait() 674 self.evt.clear() 675 676 def tearDown(self): 677 # wait on the server thread to terminate 678 self.evt.wait() 679 680 # disable traceback reporting 681 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 682 683 class SimpleServerTestCase(BaseServerTestCase): 684 def test_simple1(self): 685 try: 686 p = xmlrpclib.ServerProxy(URL) 687 self.assertEqual(p.pow(6,8), 6**8) 688 except (xmlrpclib.ProtocolError, OSError) as e: 689 # ignore failures due to non-blocking socket 'unavailable' errors 690 if not is_unavailable_exception(e): 691 # protocol error; provide additional information in test output 692 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 693 694 def test_nonascii(self): 695 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 696 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 697 try: 698 p = xmlrpclib.ServerProxy(URL) 699 self.assertEqual(p.add(start_string, end_string), 700 start_string + end_string) 701 except (xmlrpclib.ProtocolError, OSError) as e: 702 # ignore failures due to non-blocking socket 'unavailable' errors 703 if not is_unavailable_exception(e): 704 # protocol error; provide additional information in test output 705 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 706 707 def test_client_encoding(self): 708 start_string = '\u20ac' 709 end_string = '\xa4' 710 711 try: 712 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 713 self.assertEqual(p.add(start_string, end_string), 714 start_string + end_string) 715 except (xmlrpclib.ProtocolError, socket.error) as e: 716 # ignore failures due to non-blocking socket unavailable errors. 717 if not is_unavailable_exception(e): 718 # protocol error; provide additional information in test output 719 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 720 721 def test_nonascii_methodname(self): 722 try: 723 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 724 self.assertEqual(p.tt(42), 42) 725 except (xmlrpclib.ProtocolError, socket.error) as e: 726 # ignore failures due to non-blocking socket unavailable errors. 727 if not is_unavailable_exception(e): 728 # protocol error; provide additional information in test output 729 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 730 731 # [ch] The test 404 is causing lots of false alarms. 732 def XXXtest_404(self): 733 # send POST with http.client, it should return 404 header and 734 # 'Not Found' message. 735 conn = httplib.client.HTTPConnection(ADDR, PORT) 736 conn.request('POST', '/this-is-not-valid') 737 response = conn.getresponse() 738 conn.close() 739 740 self.assertEqual(response.status, 404) 741 self.assertEqual(response.reason, 'Not Found') 742 743 def test_introspection1(self): 744 expected_methods = set(['pow', 'div', 'my_function', 'add', 'tt', 745 'system.listMethods', 'system.methodHelp', 746 'system.methodSignature', 'system.multicall', 747 'Fixture']) 748 try: 749 p = xmlrpclib.ServerProxy(URL) 750 meth = p.system.listMethods() 751 self.assertEqual(set(meth), expected_methods) 752 except (xmlrpclib.ProtocolError, OSError) as e: 753 # ignore failures due to non-blocking socket 'unavailable' errors 754 if not is_unavailable_exception(e): 755 # protocol error; provide additional information in test output 756 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 757 758 759 def test_introspection2(self): 760 try: 761 # test _methodHelp() 762 p = xmlrpclib.ServerProxy(URL) 763 divhelp = p.system.methodHelp('div') 764 self.assertEqual(divhelp, 'This is the div function') 765 except (xmlrpclib.ProtocolError, OSError) as e: 766 # ignore failures due to non-blocking socket 'unavailable' errors 767 if not is_unavailable_exception(e): 768 # protocol error; provide additional information in test output 769 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 770 771 @make_request_and_skipIf(sys.flags.optimize >= 2, 772 "Docstrings are omitted with -O2 and above") 773 def test_introspection3(self): 774 try: 775 # test native doc 776 p = xmlrpclib.ServerProxy(URL) 777 myfunction = p.system.methodHelp('my_function') 778 self.assertEqual(myfunction, 'This is my function') 779 except (xmlrpclib.ProtocolError, OSError) as e: 780 # ignore failures due to non-blocking socket 'unavailable' errors 781 if not is_unavailable_exception(e): 782 # protocol error; provide additional information in test output 783 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 784 785 def test_introspection4(self): 786 # the SimpleXMLRPCServer doesn't support signatures, but 787 # at least check that we can try making the call 788 try: 789 p = xmlrpclib.ServerProxy(URL) 790 divsig = p.system.methodSignature('div') 791 self.assertEqual(divsig, 'signatures not supported') 792 except (xmlrpclib.ProtocolError, OSError) as e: 793 # ignore failures due to non-blocking socket 'unavailable' errors 794 if not is_unavailable_exception(e): 795 # protocol error; provide additional information in test output 796 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 797 798 def test_multicall(self): 799 try: 800 p = xmlrpclib.ServerProxy(URL) 801 multicall = xmlrpclib.MultiCall(p) 802 multicall.add(2,3) 803 multicall.pow(6,8) 804 multicall.div(127,42) 805 add_result, pow_result, div_result = multicall() 806 self.assertEqual(add_result, 2+3) 807 self.assertEqual(pow_result, 6**8) 808 self.assertEqual(div_result, 127//42) 809 except (xmlrpclib.ProtocolError, OSError) as e: 810 # ignore failures due to non-blocking socket 'unavailable' errors 811 if not is_unavailable_exception(e): 812 # protocol error; provide additional information in test output 813 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 814 815 def test_non_existing_multicall(self): 816 try: 817 p = xmlrpclib.ServerProxy(URL) 818 multicall = xmlrpclib.MultiCall(p) 819 multicall.this_is_not_exists() 820 result = multicall() 821 822 # result.results contains; 823 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 824 # 'method "this_is_not_exists" is not supported'>}] 825 826 self.assertEqual(result.results[0]['faultCode'], 1) 827 self.assertEqual(result.results[0]['faultString'], 828 '<class \'Exception\'>:method "this_is_not_exists" ' 829 'is not supported') 830 except (xmlrpclib.ProtocolError, OSError) as e: 831 # ignore failures due to non-blocking socket 'unavailable' errors 832 if not is_unavailable_exception(e): 833 # protocol error; provide additional information in test output 834 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 835 836 def test_dotted_attribute(self): 837 # Raises an AttributeError because private methods are not allowed. 838 self.assertRaises(AttributeError, 839 xmlrpc.server.resolve_dotted_attribute, str, '__add') 840 841 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 842 # Get the test to run faster by sending a request with test_simple1. 843 # This avoids waiting for the socket timeout. 844 self.test_simple1() 845 846 def test_allow_dotted_names_true(self): 847 # XXX also need allow_dotted_names_false test. 848 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 849 data = server.Fixture.getData() 850 self.assertEqual(data, '42') 851 852 def test_unicode_host(self): 853 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 854 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 855 856 def test_partial_post(self): 857 # Check that a partial POST doesn't make the server loop: issue #14001. 858 conn = http.client.HTTPConnection(ADDR, PORT) 859 conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') 860 conn.close() 861 862 def test_context_manager(self): 863 with xmlrpclib.ServerProxy(URL) as server: 864 server.add(2, 3) 865 self.assertNotEqual(server('transport')._connection, 866 (None, None)) 867 self.assertEqual(server('transport')._connection, 868 (None, None)) 869 870 def test_context_manager_method_error(self): 871 try: 872 with xmlrpclib.ServerProxy(URL) as server: 873 server.add(2, "a") 874 except xmlrpclib.Fault: 875 pass 876 self.assertEqual(server('transport')._connection, 877 (None, None)) 878 879 880 class SimpleServerEncodingTestCase(BaseServerTestCase): 881 @staticmethod 882 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 883 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 884 885 def test_server_encoding(self): 886 start_string = '\u20ac' 887 end_string = '\xa4' 888 889 try: 890 p = xmlrpclib.ServerProxy(URL) 891 self.assertEqual(p.add(start_string, end_string), 892 start_string + end_string) 893 except (xmlrpclib.ProtocolError, socket.error) as e: 894 # ignore failures due to non-blocking socket unavailable errors. 895 if not is_unavailable_exception(e): 896 # protocol error; provide additional information in test output 897 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 898 899 900 class MultiPathServerTestCase(BaseServerTestCase): 901 threadFunc = staticmethod(http_multi_server) 902 request_count = 2 903 def test_path1(self): 904 p = xmlrpclib.ServerProxy(URL+"/foo") 905 self.assertEqual(p.pow(6,8), 6**8) 906 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 907 908 def test_path2(self): 909 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 910 self.assertEqual(p.add(6,8), 6+8) 911 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 912 913 def test_path3(self): 914 p = xmlrpclib.ServerProxy(URL+"/is/broken") 915 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 916 917 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 918 #does indeed serve subsequent requests on the same connection 919 class BaseKeepaliveServerTestCase(BaseServerTestCase): 920 #a request handler that supports keep-alive and logs requests into a 921 #class variable 922 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 923 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 924 protocol_version = 'HTTP/1.1' 925 myRequests = [] 926 def handle(self): 927 self.myRequests.append([]) 928 self.reqidx = len(self.myRequests)-1 929 return self.parentClass.handle(self) 930 def handle_one_request(self): 931 result = self.parentClass.handle_one_request(self) 932 self.myRequests[self.reqidx].append(self.raw_requestline) 933 return result 934 935 requestHandler = RequestHandler 936 def setUp(self): 937 #clear request log 938 self.RequestHandler.myRequests = [] 939 return BaseServerTestCase.setUp(self) 940 941 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 942 #does indeed serve subsequent requests on the same connection 943 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 944 def test_two(self): 945 p = xmlrpclib.ServerProxy(URL) 946 #do three requests. 947 self.assertEqual(p.pow(6,8), 6**8) 948 self.assertEqual(p.pow(6,8), 6**8) 949 self.assertEqual(p.pow(6,8), 6**8) 950 p("close")() 951 952 #they should have all been handled by a single request handler 953 self.assertEqual(len(self.RequestHandler.myRequests), 1) 954 955 #check that we did at least two (the third may be pending append 956 #due to thread scheduling) 957 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 958 959 960 #test special attribute access on the serverproxy, through the __call__ 961 #function. 962 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 963 #ask for two keepalive requests to be handled. 964 request_count=2 965 966 def test_close(self): 967 p = xmlrpclib.ServerProxy(URL) 968 #do some requests with close. 969 self.assertEqual(p.pow(6,8), 6**8) 970 self.assertEqual(p.pow(6,8), 6**8) 971 self.assertEqual(p.pow(6,8), 6**8) 972 p("close")() #this should trigger a new keep-alive request 973 self.assertEqual(p.pow(6,8), 6**8) 974 self.assertEqual(p.pow(6,8), 6**8) 975 self.assertEqual(p.pow(6,8), 6**8) 976 p("close")() 977 978 #they should have all been two request handlers, each having logged at least 979 #two complete requests 980 self.assertEqual(len(self.RequestHandler.myRequests), 2) 981 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 982 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 983 984 985 def test_transport(self): 986 p = xmlrpclib.ServerProxy(URL) 987 #do some requests with close. 988 self.assertEqual(p.pow(6,8), 6**8) 989 p("transport").close() #same as above, really. 990 self.assertEqual(p.pow(6,8), 6**8) 991 p("close")() 992 self.assertEqual(len(self.RequestHandler.myRequests), 2) 993 994 #A test case that verifies that gzip encoding works in both directions 995 #(for a request and the response) 996 @unittest.skipIf(gzip is None, 'requires gzip') 997 class GzipServerTestCase(BaseServerTestCase): 998 #a request handler that supports keep-alive and logs requests into a 999 #class variable 1000 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1001 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1002 protocol_version = 'HTTP/1.1' 1003 1004 def do_POST(self): 1005 #store content of last request in class 1006 self.__class__.content_length = int(self.headers["content-length"]) 1007 return self.parentClass.do_POST(self) 1008 requestHandler = RequestHandler 1009 1010 class Transport(xmlrpclib.Transport): 1011 #custom transport, stores the response length for our perusal 1012 fake_gzip = False 1013 def parse_response(self, response): 1014 self.response_length=int(response.getheader("content-length", 0)) 1015 return xmlrpclib.Transport.parse_response(self, response) 1016 1017 def send_content(self, connection, body): 1018 if self.fake_gzip: 1019 #add a lone gzip header to induce decode error remotely 1020 connection.putheader("Content-Encoding", "gzip") 1021 return xmlrpclib.Transport.send_content(self, connection, body) 1022 1023 def setUp(self): 1024 BaseServerTestCase.setUp(self) 1025 1026 def test_gzip_request(self): 1027 t = self.Transport() 1028 t.encode_threshold = None 1029 p = xmlrpclib.ServerProxy(URL, transport=t) 1030 self.assertEqual(p.pow(6,8), 6**8) 1031 a = self.RequestHandler.content_length 1032 t.encode_threshold = 0 #turn on request encoding 1033 self.assertEqual(p.pow(6,8), 6**8) 1034 b = self.RequestHandler.content_length 1035 self.assertTrue(a>b) 1036 p("close")() 1037 1038 def test_bad_gzip_request(self): 1039 t = self.Transport() 1040 t.encode_threshold = None 1041 t.fake_gzip = True 1042 p = xmlrpclib.ServerProxy(URL, transport=t) 1043 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1044 re.compile(r"\b400\b")) 1045 with cm: 1046 p.pow(6, 8) 1047 p("close")() 1048 1049 def test_gzip_response(self): 1050 t = self.Transport() 1051 p = xmlrpclib.ServerProxy(URL, transport=t) 1052 old = self.requestHandler.encode_threshold 1053 self.requestHandler.encode_threshold = None #no encoding 1054 self.assertEqual(p.pow(6,8), 6**8) 1055 a = t.response_length 1056 self.requestHandler.encode_threshold = 0 #always encode 1057 self.assertEqual(p.pow(6,8), 6**8) 1058 p("close")() 1059 b = t.response_length 1060 self.requestHandler.encode_threshold = old 1061 self.assertTrue(a>b) 1062 1063 1064 @unittest.skipIf(gzip is None, 'requires gzip') 1065 class GzipUtilTestCase(unittest.TestCase): 1066 1067 def test_gzip_decode_limit(self): 1068 max_gzip_decode = 20 * 1024 * 1024 1069 data = b'\0' * max_gzip_decode 1070 encoded = xmlrpclib.gzip_encode(data) 1071 decoded = xmlrpclib.gzip_decode(encoded) 1072 self.assertEqual(len(decoded), max_gzip_decode) 1073 1074 data = b'\0' * (max_gzip_decode + 1) 1075 encoded = xmlrpclib.gzip_encode(data) 1076 1077 with self.assertRaisesRegex(ValueError, 1078 "max gzipped payload length exceeded"): 1079 xmlrpclib.gzip_decode(encoded) 1080 1081 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1082 1083 1084 #Test special attributes of the ServerProxy object 1085 class ServerProxyTestCase(unittest.TestCase): 1086 def setUp(self): 1087 unittest.TestCase.setUp(self) 1088 if threading: 1089 self.url = URL 1090 else: 1091 # Without threading, http_server() and http_multi_server() will not 1092 # be executed and URL is still equal to None. 'http://' is a just 1093 # enough to choose the scheme (HTTP) 1094 self.url = 'http://' 1095 1096 def test_close(self): 1097 p = xmlrpclib.ServerProxy(self.url) 1098 self.assertEqual(p('close')(), None) 1099 1100 def test_transport(self): 1101 t = xmlrpclib.Transport() 1102 p = xmlrpclib.ServerProxy(self.url, transport=t) 1103 self.assertEqual(p('transport'), t) 1104 1105 1106 # This is a contrived way to make a failure occur on the server side 1107 # in order to test the _send_traceback_header flag on the server 1108 class FailingMessageClass(http.client.HTTPMessage): 1109 def get(self, key, failobj=None): 1110 key = key.lower() 1111 if key == 'content-length': 1112 return 'I am broken' 1113 return super().get(key, failobj) 1114 1115 1116 @unittest.skipUnless(threading, 'Threading required for this test.') 1117 class FailingServerTestCase(unittest.TestCase): 1118 def setUp(self): 1119 self.evt = threading.Event() 1120 # start server thread to handle requests 1121 serv_args = (self.evt, 1) 1122 threading.Thread(target=http_server, args=serv_args).start() 1123 1124 # wait for the server to be ready 1125 self.evt.wait() 1126 self.evt.clear() 1127 1128 def tearDown(self): 1129 # wait on the server thread to terminate 1130 self.evt.wait() 1131 # reset flag 1132 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1133 # reset message class 1134 default_class = http.client.HTTPMessage 1135 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1136 1137 def test_basic(self): 1138 # check that flag is false by default 1139 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1140 self.assertEqual(flagval, False) 1141 1142 # enable traceback reporting 1143 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1144 1145 # test a call that shouldn't fail just as a smoke test 1146 try: 1147 p = xmlrpclib.ServerProxy(URL) 1148 self.assertEqual(p.pow(6,8), 6**8) 1149 except (xmlrpclib.ProtocolError, OSError) as e: 1150 # ignore failures due to non-blocking socket 'unavailable' errors 1151 if not is_unavailable_exception(e): 1152 # protocol error; provide additional information in test output 1153 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1154 1155 def test_fail_no_info(self): 1156 # use the broken message class 1157 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1158 1159 try: 1160 p = xmlrpclib.ServerProxy(URL) 1161 p.pow(6,8) 1162 except (xmlrpclib.ProtocolError, OSError) as e: 1163 # ignore failures due to non-blocking socket 'unavailable' errors 1164 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1165 # The two server-side error headers shouldn't be sent back in this case 1166 self.assertTrue(e.headers.get("X-exception") is None) 1167 self.assertTrue(e.headers.get("X-traceback") is None) 1168 else: 1169 self.fail('ProtocolError not raised') 1170 1171 def test_fail_with_info(self): 1172 # use the broken message class 1173 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1174 1175 # Check that errors in the server send back exception/traceback 1176 # info when flag is set 1177 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1178 1179 try: 1180 p = xmlrpclib.ServerProxy(URL) 1181 p.pow(6,8) 1182 except (xmlrpclib.ProtocolError, OSError) as e: 1183 # ignore failures due to non-blocking socket 'unavailable' errors 1184 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1185 # We should get error info in the response 1186 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1187 self.assertEqual(e.headers.get("X-exception"), expected_err) 1188 self.assertTrue(e.headers.get("X-traceback") is not None) 1189 else: 1190 self.fail('ProtocolError not raised') 1191 1192 1193 @contextlib.contextmanager 1194 def captured_stdout(encoding='utf-8'): 1195 """A variation on support.captured_stdout() which gives a text stream 1196 having a `buffer` attribute. 1197 """ 1198 orig_stdout = sys.stdout 1199 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1200 try: 1201 yield sys.stdout 1202 finally: 1203 sys.stdout = orig_stdout 1204 1205 1206 class CGIHandlerTestCase(unittest.TestCase): 1207 def setUp(self): 1208 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1209 1210 def tearDown(self): 1211 self.cgi = None 1212 1213 def test_cgi_get(self): 1214 with support.EnvironmentVarGuard() as env: 1215 env['REQUEST_METHOD'] = 'GET' 1216 # if the method is GET and no request_text is given, it runs handle_get 1217 # get sysout output 1218 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1219 self.cgi.handle_request() 1220 1221 # parse Status header 1222 data_out.seek(0) 1223 handle = data_out.read() 1224 status = handle.split()[1] 1225 message = ' '.join(handle.split()[2:4]) 1226 1227 self.assertEqual(status, '400') 1228 self.assertEqual(message, 'Bad Request') 1229 1230 1231 def test_cgi_xmlrpc_response(self): 1232 data = """<?xml version='1.0'?> 1233 <methodCall> 1234 <methodName>test_method</methodName> 1235 <params> 1236 <param> 1237 <value><string>foo</string></value> 1238 </param> 1239 <param> 1240 <value><string>bar</string></value> 1241 </param> 1242 </params> 1243 </methodCall> 1244 """ 1245 1246 with support.EnvironmentVarGuard() as env, \ 1247 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1248 support.captured_stdin() as data_in: 1249 data_in.write(data) 1250 data_in.seek(0) 1251 env['CONTENT_LENGTH'] = str(len(data)) 1252 self.cgi.handle_request() 1253 data_out.seek(0) 1254 1255 # will respond exception, if so, our goal is achieved ;) 1256 handle = data_out.read() 1257 1258 # start with 44th char so as not to get http header, we just 1259 # need only xml 1260 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1261 1262 # Also test the content-length returned by handle_request 1263 # Using the same test method inorder to avoid all the datapassing 1264 # boilerplate code. 1265 # Test for bug: http://bugs.python.org/issue5040 1266 1267 content = handle[handle.find("<?xml"):] 1268 1269 self.assertEqual( 1270 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1271 len(content)) 1272 1273 1274 class UseBuiltinTypesTestCase(unittest.TestCase): 1275 1276 def test_use_builtin_types(self): 1277 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1278 # makes all dispatch of binary data as bytes instances, and all 1279 # dispatch of datetime argument as datetime.datetime instances. 1280 self.log = [] 1281 expected_bytes = b"my dog has fleas" 1282 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1283 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1284 def foobar(*args): 1285 self.log.extend(args) 1286 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1287 allow_none=True, encoding=None, use_builtin_types=True) 1288 handler.register_function(foobar) 1289 handler._marshaled_dispatch(marshaled) 1290 self.assertEqual(len(self.log), 2) 1291 mybytes, mydate = self.log 1292 self.assertEqual(self.log, [expected_bytes, expected_date]) 1293 self.assertIs(type(mydate), datetime.datetime) 1294 self.assertIs(type(mybytes), bytes) 1295 1296 def test_cgihandler_has_use_builtin_types_flag(self): 1297 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1298 self.assertTrue(handler.use_builtin_types) 1299 1300 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1301 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1302 use_builtin_types=True) 1303 server.server_close() 1304 self.assertTrue(server.use_builtin_types) 1305 1306 1307 @support.reap_threads 1308 def test_main(): 1309 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1310 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1311 SimpleServerTestCase, SimpleServerEncodingTestCase, 1312 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1313 GzipServerTestCase, GzipUtilTestCase, 1314 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1315 CGIHandlerTestCase) 1316 1317 1318 if __name__ == "__main__": 1319 test_main() 1320