1 import base64 2 import datetime 3 import sys 4 import time 5 import unittest 6 import xmlrpclib 7 import SimpleXMLRPCServer 8 import mimetools 9 import httplib 10 import socket 11 import StringIO 12 import os 13 import re 14 from test import test_support 15 16 try: 17 import threading 18 except ImportError: 19 threading = None 20 21 try: 22 unicode 23 except NameError: 24 have_unicode = False 25 else: 26 have_unicode = True 27 28 alist = [{'astring': 'foo (at] bar.baz.spam', 29 'afloat': 7283.43, 30 'anint': 2**20, 31 'ashortlong': 2L, 32 'anotherlist': ['.zyx.41'], 33 'abase64': xmlrpclib.Binary("my dog has fleas"), 34 'boolean': xmlrpclib.False, 35 'unicode': u'\u4000\u6000\u8000', 36 u'ukey\u4000': 'regular value', 37 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 38 'datetime2': xmlrpclib.DateTime( 39 (2005, 02, 10, 11, 41, 23, 0, 1, -1)), 40 'datetime3': xmlrpclib.DateTime( 41 datetime.datetime(2005, 02, 10, 11, 41, 23)), 42 }] 43 44 class XMLRPCTestCase(unittest.TestCase): 45 46 def test_dump_load(self): 47 self.assertEqual(alist, 48 xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0]) 49 50 def test_dump_bare_datetime(self): 51 # This checks that an unwrapped datetime.date object can be handled 52 # by the marshalling code. This can't be done via test_dump_load() 53 # since with use_datetime set to 1 the unmarshaller would create 54 # datetime objects for the 'datetime[123]' keys as well 55 dt = datetime.datetime(2005, 02, 10, 11, 41, 23) 56 s = xmlrpclib.dumps((dt,)) 57 (newdt,), m = xmlrpclib.loads(s, use_datetime=1) 58 self.assertEqual(newdt, dt) 59 self.assertEqual(m, None) 60 61 (newdt,), m = xmlrpclib.loads(s, use_datetime=0) 62 self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23')) 63 64 def test_datetime_before_1900(self): 65 # same as before but with a date before 1900 66 dt = datetime.datetime(1, 02, 10, 11, 41, 23) 67 s = xmlrpclib.dumps((dt,)) 68 (newdt,), m = xmlrpclib.loads(s, use_datetime=1) 69 self.assertEqual(newdt, dt) 70 self.assertEqual(m, None) 71 72 (newdt,), m = xmlrpclib.loads(s, use_datetime=0) 73 self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23')) 74 75 def test_cmp_datetime_DateTime(self): 76 now = datetime.datetime.now() 77 dt = xmlrpclib.DateTime(now.timetuple()) 78 self.assertTrue(dt == now) 79 self.assertTrue(now == dt) 80 then = now + datetime.timedelta(seconds=4) 81 self.assertTrue(then >= dt) 82 self.assertTrue(dt < then) 83 84 def test_bug_1164912 (self): 85 d = xmlrpclib.DateTime() 86 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 87 methodresponse=True)) 88 self.assertIsInstance(new_d.value, str) 89 90 # Check that the output of dumps() is still an 8-bit string 91 s = xmlrpclib.dumps((new_d,), methodresponse=True) 92 self.assertIsInstance(s, str) 93 94 def test_newstyle_class(self): 95 class T(object): 96 pass 97 t = T() 98 t.x = 100 99 t.y = "Hello" 100 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 101 self.assertEqual(t2, t.__dict__) 102 103 def test_dump_big_long(self): 104 self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,)) 105 106 def test_dump_bad_dict(self): 107 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 108 109 def test_dump_recursive_seq(self): 110 l = [1,2,3] 111 t = [3,4,5,l] 112 l.append(t) 113 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 114 115 def test_dump_recursive_dict(self): 116 d = {'1':1, '2':1} 117 t = {'3':3, 'd':d} 118 d['t'] = t 119 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 120 121 def test_dump_big_int(self): 122 if sys.maxint > 2L**31-1: 123 self.assertRaises(OverflowError, xmlrpclib.dumps, 124 (int(2L**34),)) 125 126 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 127 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,)) 128 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,)) 129 130 def dummy_write(s): 131 pass 132 133 m = xmlrpclib.Marshaller() 134 m.dump_int(xmlrpclib.MAXINT, dummy_write) 135 m.dump_int(xmlrpclib.MININT, dummy_write) 136 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write) 137 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write) 138 139 140 def test_dump_none(self): 141 value = alist + [None] 142 arg1 = (alist + [None],) 143 strg = xmlrpclib.dumps(arg1, allow_none=True) 144 self.assertEqual(value, 145 xmlrpclib.loads(strg)[0][0]) 146 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 147 148 def test_default_encoding_issues(self): 149 # SF bug #1115989: wrong decoding in '_stringify' 150 utf8 = """<?xml version='1.0' encoding='iso-8859-1'?> 151 <params> 152 <param><value> 153 <string>abc \x95</string> 154 </value></param> 155 <param><value> 156 <struct> 157 <member> 158 <name>def \x96</name> 159 <value><string>ghi \x97</string></value> 160 </member> 161 </struct> 162 </value></param> 163 </params> 164 """ 165 166 # sys.setdefaultencoding() normally doesn't exist after site.py is 167 # loaded. Import a temporary fresh copy to get access to it 168 # but then restore the original copy to avoid messing with 169 # other potentially modified sys module attributes 170 old_encoding = sys.getdefaultencoding() 171 with test_support.CleanImport('sys'): 172 import sys as temp_sys 173 temp_sys.setdefaultencoding("iso-8859-1") 174 try: 175 (s, d), m = xmlrpclib.loads(utf8) 176 finally: 177 temp_sys.setdefaultencoding(old_encoding) 178 179 items = d.items() 180 if have_unicode: 181 self.assertEqual(s, u"abc \x95") 182 self.assertIsInstance(s, unicode) 183 self.assertEqual(items, [(u"def \x96", u"ghi \x97")]) 184 self.assertIsInstance(items[0][0], unicode) 185 self.assertIsInstance(items[0][1], unicode) 186 else: 187 self.assertEqual(s, "abc \xc2\x95") 188 self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")]) 189 190 191 class HelperTestCase(unittest.TestCase): 192 def test_escape(self): 193 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 194 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 195 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 196 197 class FaultTestCase(unittest.TestCase): 198 def test_repr(self): 199 f = xmlrpclib.Fault(42, 'Test Fault') 200 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 201 self.assertEqual(repr(f), str(f)) 202 203 def test_dump_fault(self): 204 f = xmlrpclib.Fault(42, 'Test Fault') 205 s = xmlrpclib.dumps((f,)) 206 (newf,), m = xmlrpclib.loads(s) 207 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 208 self.assertEqual(m, None) 209 210 s = xmlrpclib.Marshaller().dumps(f) 211 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 212 213 214 class DateTimeTestCase(unittest.TestCase): 215 def test_default(self): 216 t = xmlrpclib.DateTime() 217 218 def test_time(self): 219 d = 1181399930.036952 220 t = xmlrpclib.DateTime(d) 221 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 222 223 def test_time_tuple(self): 224 d = (2007,6,9,10,38,50,5,160,0) 225 t = xmlrpclib.DateTime(d) 226 self.assertEqual(str(t), '20070609T10:38:50') 227 228 def test_time_struct(self): 229 d = time.localtime(1181399930.036952) 230 t = xmlrpclib.DateTime(d) 231 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 232 233 def test_datetime_datetime(self): 234 d = datetime.datetime(2007,1,2,3,4,5) 235 t = xmlrpclib.DateTime(d) 236 self.assertEqual(str(t), '20070102T03:04:05') 237 238 def test_repr(self): 239 d = datetime.datetime(2007,1,2,3,4,5) 240 t = xmlrpclib.DateTime(d) 241 val ="<DateTime '20070102T03:04:05' at %x>" % id(t) 242 self.assertEqual(repr(t), val) 243 244 def test_decode(self): 245 d = ' 20070908T07:11:13 ' 246 t1 = xmlrpclib.DateTime() 247 t1.decode(d) 248 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 249 self.assertEqual(t1, tref) 250 251 t2 = xmlrpclib._datetime(d) 252 self.assertEqual(t1, tref) 253 254 class BinaryTestCase(unittest.TestCase): 255 def test_default(self): 256 t = xmlrpclib.Binary() 257 self.assertEqual(str(t), '') 258 259 def test_string(self): 260 d = '\x01\x02\x03abc123\xff\xfe' 261 t = xmlrpclib.Binary(d) 262 self.assertEqual(str(t), d) 263 264 def test_decode(self): 265 d = '\x01\x02\x03abc123\xff\xfe' 266 de = base64.encodestring(d) 267 t1 = xmlrpclib.Binary() 268 t1.decode(de) 269 self.assertEqual(str(t1), d) 270 271 t2 = xmlrpclib._binary(de) 272 self.assertEqual(str(t2), d) 273 274 275 ADDR = PORT = URL = None 276 277 # The evt is set twice. First when the server is ready to serve. 278 # Second when the server has been shutdown. The user must clear 279 # the event after it has been set the first time to catch the second set. 280 def http_server(evt, numrequests, requestHandler=None): 281 class TestInstanceClass: 282 def div(self, x, y): 283 return x // y 284 285 def _methodHelp(self, name): 286 if name == 'div': 287 return 'This is the div function' 288 289 def my_function(): 290 '''This is my function''' 291 return True 292 293 class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer): 294 def get_request(self): 295 # Ensure the socket is always non-blocking. On Linux, socket 296 # attributes are not inherited like they are on *BSD and Windows. 297 s, port = self.socket.accept() 298 s.setblocking(True) 299 return s, port 300 301 if not requestHandler: 302 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 303 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 304 logRequests=False, bind_and_activate=False) 305 try: 306 serv.socket.settimeout(3) 307 serv.server_bind() 308 global ADDR, PORT, URL 309 ADDR, PORT = serv.socket.getsockname() 310 #connect to IP address directly. This avoids socket.create_connection() 311 #trying to connect to to "localhost" using all address families, which 312 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 313 #on AF_INET only. 314 URL = "http://%s:%d"%(ADDR, PORT) 315 serv.server_activate() 316 serv.register_introspection_functions() 317 serv.register_multicall_functions() 318 serv.register_function(pow) 319 serv.register_function(lambda x,y: x+y, 'add') 320 serv.register_function(my_function) 321 serv.register_instance(TestInstanceClass()) 322 evt.set() 323 324 # handle up to 'numrequests' requests 325 while numrequests > 0: 326 serv.handle_request() 327 numrequests -= 1 328 329 except socket.timeout: 330 pass 331 finally: 332 serv.socket.close() 333 PORT = None 334 evt.set() 335 336 def http_multi_server(evt, numrequests, requestHandler=None): 337 class TestInstanceClass: 338 def div(self, x, y): 339 return x // y 340 341 def _methodHelp(self, name): 342 if name == 'div': 343 return 'This is the div function' 344 345 def my_function(): 346 '''This is my function''' 347 return True 348 349 class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer): 350 def get_request(self): 351 # Ensure the socket is always non-blocking. On Linux, socket 352 # attributes are not inherited like they are on *BSD and Windows. 353 s, port = self.socket.accept() 354 s.setblocking(True) 355 return s, port 356 357 if not requestHandler: 358 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 359 class MyRequestHandler(requestHandler): 360 rpc_paths = [] 361 362 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 363 logRequests=False, bind_and_activate=False) 364 serv.socket.settimeout(3) 365 serv.server_bind() 366 try: 367 global ADDR, PORT, URL 368 ADDR, PORT = serv.socket.getsockname() 369 #connect to IP address directly. This avoids socket.create_connection() 370 #trying to connect to to "localhost" using all address families, which 371 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 372 #on AF_INET only. 373 URL = "http://%s:%d"%(ADDR, PORT) 374 serv.server_activate() 375 paths = ["/foo", "/foo/bar"] 376 for path in paths: 377 d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher()) 378 d.register_introspection_functions() 379 d.register_multicall_functions() 380 serv.get_dispatcher(paths[0]).register_function(pow) 381 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 382 evt.set() 383 384 # handle up to 'numrequests' requests 385 while numrequests > 0: 386 serv.handle_request() 387 numrequests -= 1 388 389 except socket.timeout: 390 pass 391 finally: 392 serv.socket.close() 393 PORT = None 394 evt.set() 395 396 # This function prevents errors like: 397 # <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 398 def is_unavailable_exception(e): 399 '''Returns True if the given ProtocolError is the product of a server-side 400 exception caused by the 'temporarily unavailable' response sometimes 401 given by operations on non-blocking sockets.''' 402 403 # sometimes we get a -1 error code and/or empty headers 404 try: 405 if e.errcode == -1 or e.headers is None: 406 return True 407 exc_mess = e.headers.get('X-exception') 408 except AttributeError: 409 # Ignore socket.errors here. 410 exc_mess = str(e) 411 412 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 413 return True 414 415 return False 416 417 @unittest.skipUnless(threading, 'Threading required for this test.') 418 class BaseServerTestCase(unittest.TestCase): 419 requestHandler = None 420 request_count = 1 421 threadFunc = staticmethod(http_server) 422 423 def setUp(self): 424 # enable traceback reporting 425 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True 426 427 self.evt = threading.Event() 428 # start server thread to handle requests 429 serv_args = (self.evt, self.request_count, self.requestHandler) 430 threading.Thread(target=self.threadFunc, args=serv_args).start() 431 432 # wait for the server to be ready 433 self.evt.wait(10) 434 self.evt.clear() 435 436 def tearDown(self): 437 # wait on the server thread to terminate 438 self.evt.wait(10) 439 440 # disable traceback reporting 441 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False 442 443 # NOTE: The tests in SimpleServerTestCase will ignore failures caused by 444 # "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This 445 # condition occurs infrequently on some platforms, frequently on others, and 446 # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket 447 # If the server class is updated at some point in the future to handle this 448 # situation more gracefully, these tests should be modified appropriately. 449 450 class SimpleServerTestCase(BaseServerTestCase): 451 def test_simple1(self): 452 try: 453 p = xmlrpclib.ServerProxy(URL) 454 self.assertEqual(p.pow(6,8), 6**8) 455 except (xmlrpclib.ProtocolError, socket.error), e: 456 # ignore failures due to non-blocking socket 'unavailable' errors 457 if not is_unavailable_exception(e): 458 # protocol error; provide additional information in test output 459 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 460 461 def test_nonascii(self): 462 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 463 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 464 465 try: 466 p = xmlrpclib.ServerProxy(URL) 467 self.assertEqual(p.add(start_string, end_string), 468 start_string + end_string) 469 except (xmlrpclib.ProtocolError, socket.error) as e: 470 # ignore failures due to non-blocking socket unavailable errors. 471 if not is_unavailable_exception(e): 472 # protocol error; provide additional information in test output 473 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 474 475 476 # [ch] The test 404 is causing lots of false alarms. 477 def XXXtest_404(self): 478 # send POST with httplib, it should return 404 header and 479 # 'Not Found' message. 480 conn = httplib.HTTPConnection(ADDR, PORT) 481 conn.request('POST', '/this-is-not-valid') 482 response = conn.getresponse() 483 conn.close() 484 485 self.assertEqual(response.status, 404) 486 self.assertEqual(response.reason, 'Not Found') 487 488 def test_introspection1(self): 489 try: 490 p = xmlrpclib.ServerProxy(URL) 491 meth = p.system.listMethods() 492 expected_methods = set(['pow', 'div', 'my_function', 'add', 493 'system.listMethods', 'system.methodHelp', 494 'system.methodSignature', 'system.multicall']) 495 self.assertEqual(set(meth), expected_methods) 496 except (xmlrpclib.ProtocolError, socket.error), e: 497 # ignore failures due to non-blocking socket 'unavailable' errors 498 if not is_unavailable_exception(e): 499 # protocol error; provide additional information in test output 500 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 501 502 def test_introspection2(self): 503 try: 504 # test _methodHelp() 505 p = xmlrpclib.ServerProxy(URL) 506 divhelp = p.system.methodHelp('div') 507 self.assertEqual(divhelp, 'This is the div function') 508 except (xmlrpclib.ProtocolError, socket.error), e: 509 # ignore failures due to non-blocking socket 'unavailable' errors 510 if not is_unavailable_exception(e): 511 # protocol error; provide additional information in test output 512 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 513 514 @unittest.skipIf(sys.flags.optimize >= 2, 515 "Docstrings are omitted with -O2 and above") 516 def test_introspection3(self): 517 try: 518 # test native doc 519 p = xmlrpclib.ServerProxy(URL) 520 myfunction = p.system.methodHelp('my_function') 521 self.assertEqual(myfunction, 'This is my function') 522 except (xmlrpclib.ProtocolError, socket.error), e: 523 # ignore failures due to non-blocking socket 'unavailable' errors 524 if not is_unavailable_exception(e): 525 # protocol error; provide additional information in test output 526 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 527 528 def test_introspection4(self): 529 # the SimpleXMLRPCServer doesn't support signatures, but 530 # at least check that we can try making the call 531 try: 532 p = xmlrpclib.ServerProxy(URL) 533 divsig = p.system.methodSignature('div') 534 self.assertEqual(divsig, 'signatures not supported') 535 except (xmlrpclib.ProtocolError, socket.error), e: 536 # ignore failures due to non-blocking socket 'unavailable' errors 537 if not is_unavailable_exception(e): 538 # protocol error; provide additional information in test output 539 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 540 541 def test_multicall(self): 542 try: 543 p = xmlrpclib.ServerProxy(URL) 544 multicall = xmlrpclib.MultiCall(p) 545 multicall.add(2,3) 546 multicall.pow(6,8) 547 multicall.div(127,42) 548 add_result, pow_result, div_result = multicall() 549 self.assertEqual(add_result, 2+3) 550 self.assertEqual(pow_result, 6**8) 551 self.assertEqual(div_result, 127//42) 552 except (xmlrpclib.ProtocolError, socket.error), e: 553 # ignore failures due to non-blocking socket 'unavailable' errors 554 if not is_unavailable_exception(e): 555 # protocol error; provide additional information in test output 556 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 557 558 def test_non_existing_multicall(self): 559 try: 560 p = xmlrpclib.ServerProxy(URL) 561 multicall = xmlrpclib.MultiCall(p) 562 multicall.this_is_not_exists() 563 result = multicall() 564 565 # result.results contains; 566 # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:' 567 # 'method "this_is_not_exists" is not supported'>}] 568 569 self.assertEqual(result.results[0]['faultCode'], 1) 570 self.assertEqual(result.results[0]['faultString'], 571 '<type \'exceptions.Exception\'>:method "this_is_not_exists" ' 572 'is not supported') 573 except (xmlrpclib.ProtocolError, socket.error), e: 574 # ignore failures due to non-blocking socket 'unavailable' errors 575 if not is_unavailable_exception(e): 576 # protocol error; provide additional information in test output 577 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 578 579 def test_dotted_attribute(self): 580 # Raises an AttributeError because private methods are not allowed. 581 self.assertRaises(AttributeError, 582 SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add') 583 584 self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title')) 585 # Get the test to run faster by sending a request with test_simple1. 586 # This avoids waiting for the socket timeout. 587 self.test_simple1() 588 589 class MultiPathServerTestCase(BaseServerTestCase): 590 threadFunc = staticmethod(http_multi_server) 591 request_count = 2 592 def test_path1(self): 593 p = xmlrpclib.ServerProxy(URL+"/foo") 594 self.assertEqual(p.pow(6,8), 6**8) 595 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 596 def test_path2(self): 597 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 598 self.assertEqual(p.add(6,8), 6+8) 599 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 600 601 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 602 #does indeed serve subsequent requests on the same connection 603 class BaseKeepaliveServerTestCase(BaseServerTestCase): 604 #a request handler that supports keep-alive and logs requests into a 605 #class variable 606 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): 607 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 608 protocol_version = 'HTTP/1.1' 609 myRequests = [] 610 def handle(self): 611 self.myRequests.append([]) 612 self.reqidx = len(self.myRequests)-1 613 return self.parentClass.handle(self) 614 def handle_one_request(self): 615 result = self.parentClass.handle_one_request(self) 616 self.myRequests[self.reqidx].append(self.raw_requestline) 617 return result 618 619 requestHandler = RequestHandler 620 def setUp(self): 621 #clear request log 622 self.RequestHandler.myRequests = [] 623 return BaseServerTestCase.setUp(self) 624 625 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 626 #does indeed serve subsequent requests on the same connection 627 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 628 def test_two(self): 629 p = xmlrpclib.ServerProxy(URL) 630 #do three requests. 631 self.assertEqual(p.pow(6,8), 6**8) 632 self.assertEqual(p.pow(6,8), 6**8) 633 self.assertEqual(p.pow(6,8), 6**8) 634 635 #they should have all been handled by a single request handler 636 self.assertEqual(len(self.RequestHandler.myRequests), 1) 637 638 #check that we did at least two (the third may be pending append 639 #due to thread scheduling) 640 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 641 642 #test special attribute access on the serverproxy, through the __call__ 643 #function. 644 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 645 #ask for two keepalive requests to be handled. 646 request_count=2 647 648 def test_close(self): 649 p = xmlrpclib.ServerProxy(URL) 650 #do some requests with close. 651 self.assertEqual(p.pow(6,8), 6**8) 652 self.assertEqual(p.pow(6,8), 6**8) 653 self.assertEqual(p.pow(6,8), 6**8) 654 p("close")() #this should trigger a new keep-alive request 655 self.assertEqual(p.pow(6,8), 6**8) 656 self.assertEqual(p.pow(6,8), 6**8) 657 self.assertEqual(p.pow(6,8), 6**8) 658 659 #they should have all been two request handlers, each having logged at least 660 #two complete requests 661 self.assertEqual(len(self.RequestHandler.myRequests), 2) 662 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 663 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 664 665 def test_transport(self): 666 p = xmlrpclib.ServerProxy(URL) 667 #do some requests with close. 668 self.assertEqual(p.pow(6,8), 6**8) 669 p("transport").close() #same as above, really. 670 self.assertEqual(p.pow(6,8), 6**8) 671 self.assertEqual(len(self.RequestHandler.myRequests), 2) 672 673 #A test case that verifies that gzip encoding works in both directions 674 #(for a request and the response) 675 class GzipServerTestCase(BaseServerTestCase): 676 #a request handler that supports keep-alive and logs requests into a 677 #class variable 678 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): 679 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 680 protocol_version = 'HTTP/1.1' 681 682 def do_POST(self): 683 #store content of last request in class 684 self.__class__.content_length = int(self.headers["content-length"]) 685 return self.parentClass.do_POST(self) 686 requestHandler = RequestHandler 687 688 class Transport(xmlrpclib.Transport): 689 #custom transport, stores the response length for our perusal 690 fake_gzip = False 691 def parse_response(self, response): 692 self.response_length=int(response.getheader("content-length", 0)) 693 return xmlrpclib.Transport.parse_response(self, response) 694 695 def send_content(self, connection, body): 696 if self.fake_gzip: 697 #add a lone gzip header to induce decode error remotely 698 connection.putheader("Content-Encoding", "gzip") 699 return xmlrpclib.Transport.send_content(self, connection, body) 700 701 def setUp(self): 702 BaseServerTestCase.setUp(self) 703 704 def test_gzip_request(self): 705 t = self.Transport() 706 t.encode_threshold = None 707 p = xmlrpclib.ServerProxy(URL, transport=t) 708 self.assertEqual(p.pow(6,8), 6**8) 709 a = self.RequestHandler.content_length 710 t.encode_threshold = 0 #turn on request encoding 711 self.assertEqual(p.pow(6,8), 6**8) 712 b = self.RequestHandler.content_length 713 self.assertTrue(a>b) 714 715 def test_bad_gzip_request(self): 716 t = self.Transport() 717 t.encode_threshold = None 718 t.fake_gzip = True 719 p = xmlrpclib.ServerProxy(URL, transport=t) 720 cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError, 721 re.compile(r"\b400\b")) 722 with cm: 723 p.pow(6, 8) 724 725 def test_gsip_response(self): 726 t = self.Transport() 727 p = xmlrpclib.ServerProxy(URL, transport=t) 728 old = self.requestHandler.encode_threshold 729 self.requestHandler.encode_threshold = None #no encoding 730 self.assertEqual(p.pow(6,8), 6**8) 731 a = t.response_length 732 self.requestHandler.encode_threshold = 0 #always encode 733 self.assertEqual(p.pow(6,8), 6**8) 734 b = t.response_length 735 self.requestHandler.encode_threshold = old 736 self.assertTrue(a>b) 737 738 #Test special attributes of the ServerProxy object 739 class ServerProxyTestCase(unittest.TestCase): 740 def setUp(self): 741 unittest.TestCase.setUp(self) 742 if threading: 743 self.url = URL 744 else: 745 # Without threading, http_server() and http_multi_server() will not 746 # be executed and URL is still equal to None. 'http://' is a just 747 # enough to choose the scheme (HTTP) 748 self.url = 'http://' 749 750 def test_close(self): 751 p = xmlrpclib.ServerProxy(self.url) 752 self.assertEqual(p('close')(), None) 753 754 def test_transport(self): 755 t = xmlrpclib.Transport() 756 p = xmlrpclib.ServerProxy(self.url, transport=t) 757 self.assertEqual(p('transport'), t) 758 759 # This is a contrived way to make a failure occur on the server side 760 # in order to test the _send_traceback_header flag on the server 761 class FailingMessageClass(mimetools.Message): 762 def __getitem__(self, key): 763 key = key.lower() 764 if key == 'content-length': 765 return 'I am broken' 766 return mimetools.Message.__getitem__(self, key) 767 768 769 @unittest.skipUnless(threading, 'Threading required for this test.') 770 class FailingServerTestCase(unittest.TestCase): 771 def setUp(self): 772 self.evt = threading.Event() 773 # start server thread to handle requests 774 serv_args = (self.evt, 1) 775 threading.Thread(target=http_server, args=serv_args).start() 776 777 # wait for the server to be ready 778 self.evt.wait() 779 self.evt.clear() 780 781 def tearDown(self): 782 # wait on the server thread to terminate 783 self.evt.wait() 784 # reset flag 785 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False 786 # reset message class 787 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message 788 789 def test_basic(self): 790 # check that flag is false by default 791 flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header 792 self.assertEqual(flagval, False) 793 794 # enable traceback reporting 795 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True 796 797 # test a call that shouldn't fail just as a smoke test 798 try: 799 p = xmlrpclib.ServerProxy(URL) 800 self.assertEqual(p.pow(6,8), 6**8) 801 except (xmlrpclib.ProtocolError, socket.error), e: 802 # ignore failures due to non-blocking socket 'unavailable' errors 803 if not is_unavailable_exception(e): 804 # protocol error; provide additional information in test output 805 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 806 807 def test_fail_no_info(self): 808 # use the broken message class 809 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 810 811 try: 812 p = xmlrpclib.ServerProxy(URL) 813 p.pow(6,8) 814 except (xmlrpclib.ProtocolError, socket.error), e: 815 # ignore failures due to non-blocking socket 'unavailable' errors 816 if not is_unavailable_exception(e) and hasattr(e, "headers"): 817 # The two server-side error headers shouldn't be sent back in this case 818 self.assertTrue(e.headers.get("X-exception") is None) 819 self.assertTrue(e.headers.get("X-traceback") is None) 820 else: 821 self.fail('ProtocolError not raised') 822 823 def test_fail_with_info(self): 824 # use the broken message class 825 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 826 827 # Check that errors in the server send back exception/traceback 828 # info when flag is set 829 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True 830 831 try: 832 p = xmlrpclib.ServerProxy(URL) 833 p.pow(6,8) 834 except (xmlrpclib.ProtocolError, socket.error), e: 835 # ignore failures due to non-blocking socket 'unavailable' errors 836 if not is_unavailable_exception(e) and hasattr(e, "headers"): 837 # We should get error info in the response 838 expected_err = "invalid literal for int() with base 10: 'I am broken'" 839 self.assertEqual(e.headers.get("x-exception"), expected_err) 840 self.assertTrue(e.headers.get("x-traceback") is not None) 841 else: 842 self.fail('ProtocolError not raised') 843 844 class CGIHandlerTestCase(unittest.TestCase): 845 def setUp(self): 846 self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler() 847 848 def tearDown(self): 849 self.cgi = None 850 851 def test_cgi_get(self): 852 with test_support.EnvironmentVarGuard() as env: 853 env['REQUEST_METHOD'] = 'GET' 854 # if the method is GET and no request_text is given, it runs handle_get 855 # get sysout output 856 with test_support.captured_stdout() as data_out: 857 self.cgi.handle_request() 858 859 # parse Status header 860 data_out.seek(0) 861 handle = data_out.read() 862 status = handle.split()[1] 863 message = ' '.join(handle.split()[2:4]) 864 865 self.assertEqual(status, '400') 866 self.assertEqual(message, 'Bad Request') 867 868 869 def test_cgi_xmlrpc_response(self): 870 data = """<?xml version='1.0'?> 871 <methodCall> 872 <methodName>test_method</methodName> 873 <params> 874 <param> 875 <value><string>foo</string></value> 876 </param> 877 <param> 878 <value><string>bar</string></value> 879 </param> 880 </params> 881 </methodCall> 882 """ 883 884 with test_support.EnvironmentVarGuard() as env, \ 885 test_support.captured_stdout() as data_out, \ 886 test_support.captured_stdin() as data_in: 887 data_in.write(data) 888 data_in.seek(0) 889 env['CONTENT_LENGTH'] = str(len(data)) 890 self.cgi.handle_request() 891 data_out.seek(0) 892 893 # will respond exception, if so, our goal is achieved ;) 894 handle = data_out.read() 895 896 # start with 44th char so as not to get http header, we just need only xml 897 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 898 899 # Also test the content-length returned by handle_request 900 # Using the same test method inorder to avoid all the datapassing 901 # boilerplate code. 902 # Test for bug: http://bugs.python.org/issue5040 903 904 content = handle[handle.find("<?xml"):] 905 906 self.assertEqual( 907 int(re.search('Content-Length: (\d+)', handle).group(1)), 908 len(content)) 909 910 911 class FakeSocket: 912 913 def __init__(self): 914 self.data = StringIO.StringIO() 915 916 def send(self, buf): 917 self.data.write(buf) 918 return len(buf) 919 920 def sendall(self, buf): 921 self.data.write(buf) 922 923 def getvalue(self): 924 return self.data.getvalue() 925 926 def makefile(self, x='r', y=-1): 927 raise RuntimeError 928 929 def close(self): 930 pass 931 932 class FakeTransport(xmlrpclib.Transport): 933 """A Transport instance that records instead of sending a request. 934 935 This class replaces the actual socket used by httplib with a 936 FakeSocket object that records the request. It doesn't provide a 937 response. 938 """ 939 940 def make_connection(self, host): 941 conn = xmlrpclib.Transport.make_connection(self, host) 942 conn.sock = self.fake_socket = FakeSocket() 943 return conn 944 945 class TransportSubclassTestCase(unittest.TestCase): 946 947 def issue_request(self, transport_class): 948 """Return an HTTP request made via transport_class.""" 949 transport = transport_class() 950 proxy = xmlrpclib.ServerProxy("http://example.com/", 951 transport=transport) 952 try: 953 proxy.pow(6, 8) 954 except RuntimeError: 955 return transport.fake_socket.getvalue() 956 return None 957 958 def test_custom_user_agent(self): 959 class TestTransport(FakeTransport): 960 961 def send_user_agent(self, conn): 962 xmlrpclib.Transport.send_user_agent(self, conn) 963 conn.putheader("X-Test", "test_custom_user_agent") 964 965 req = self.issue_request(TestTransport) 966 self.assertIn("X-Test: test_custom_user_agent\r\n", req) 967 968 def test_send_host(self): 969 class TestTransport(FakeTransport): 970 971 def send_host(self, conn, host): 972 xmlrpclib.Transport.send_host(self, conn, host) 973 conn.putheader("X-Test", "test_send_host") 974 975 req = self.issue_request(TestTransport) 976 self.assertIn("X-Test: test_send_host\r\n", req) 977 978 def test_send_request(self): 979 class TestTransport(FakeTransport): 980 981 def send_request(self, conn, url, body): 982 xmlrpclib.Transport.send_request(self, conn, url, body) 983 conn.putheader("X-Test", "test_send_request") 984 985 req = self.issue_request(TestTransport) 986 self.assertIn("X-Test: test_send_request\r\n", req) 987 988 def test_send_content(self): 989 class TestTransport(FakeTransport): 990 991 def send_content(self, conn, body): 992 conn.putheader("X-Test", "test_send_content") 993 xmlrpclib.Transport.send_content(self, conn, body) 994 995 req = self.issue_request(TestTransport) 996 self.assertIn("X-Test: test_send_content\r\n", req) 997 998 @test_support.reap_threads 999 def test_main(): 1000 xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1001 BinaryTestCase, FaultTestCase, TransportSubclassTestCase] 1002 xmlrpc_tests.append(SimpleServerTestCase) 1003 xmlrpc_tests.append(KeepaliveServerTestCase1) 1004 xmlrpc_tests.append(KeepaliveServerTestCase2) 1005 try: 1006 import gzip 1007 xmlrpc_tests.append(GzipServerTestCase) 1008 except ImportError: 1009 pass #gzip not supported in this build 1010 xmlrpc_tests.append(MultiPathServerTestCase) 1011 xmlrpc_tests.append(ServerProxyTestCase) 1012 xmlrpc_tests.append(FailingServerTestCase) 1013 xmlrpc_tests.append(CGIHandlerTestCase) 1014 1015 test_support.run_unittest(*xmlrpc_tests) 1016 1017 if __name__ == "__main__": 1018 test_main() 1019