1 #!/usr/bin/env python 2 3 import unittest 4 from test import test_support 5 6 import errno 7 import socket 8 import select 9 import _testcapi 10 import time 11 import traceback 12 import Queue 13 import sys 14 import os 15 import array 16 import contextlib 17 from weakref import proxy 18 import signal 19 import math 20 21 def try_address(host, port=0, family=socket.AF_INET): 22 """Try to bind a socket on the given host:port and return True 23 if that has been possible.""" 24 try: 25 sock = socket.socket(family, socket.SOCK_STREAM) 26 sock.bind((host, port)) 27 except (socket.error, socket.gaierror): 28 return False 29 else: 30 sock.close() 31 return True 32 33 HOST = test_support.HOST 34 MSG = b'Michael Gilfix was here\n' 35 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) 36 37 try: 38 import thread 39 import threading 40 except ImportError: 41 thread = None 42 threading = None 43 44 HOST = test_support.HOST 45 MSG = 'Michael Gilfix was here\n' 46 47 class SocketTCPTest(unittest.TestCase): 48 49 def setUp(self): 50 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 51 self.port = test_support.bind_port(self.serv) 52 self.serv.listen(1) 53 54 def tearDown(self): 55 self.serv.close() 56 self.serv = None 57 58 class SocketUDPTest(unittest.TestCase): 59 60 def setUp(self): 61 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 62 self.port = test_support.bind_port(self.serv) 63 64 def tearDown(self): 65 self.serv.close() 66 self.serv = None 67 68 class ThreadableTest: 69 """Threadable Test class 70 71 The ThreadableTest class makes it easy to create a threaded 72 client/server pair from an existing unit test. To create a 73 new threaded class from an existing unit test, use multiple 74 inheritance: 75 76 class NewClass (OldClass, ThreadableTest): 77 pass 78 79 This class defines two new fixture functions with obvious 80 purposes for overriding: 81 82 clientSetUp () 83 clientTearDown () 84 85 Any new test functions within the class must then define 86 tests in pairs, where the test name is preceeded with a 87 '_' to indicate the client portion of the test. Ex: 88 89 def testFoo(self): 90 # Server portion 91 92 def _testFoo(self): 93 # Client portion 94 95 Any exceptions raised by the clients during their tests 96 are caught and transferred to the main thread to alert 97 the testing framework. 98 99 Note, the server setup function cannot call any blocking 100 functions that rely on the client thread during setup, 101 unless serverExplicitReady() is called just before 102 the blocking call (such as in setting up a client/server 103 connection and performing the accept() in setUp(). 104 """ 105 106 def __init__(self): 107 # Swap the true setup function 108 self.__setUp = self.setUp 109 self.__tearDown = self.tearDown 110 self.setUp = self._setUp 111 self.tearDown = self._tearDown 112 113 def serverExplicitReady(self): 114 """This method allows the server to explicitly indicate that 115 it wants the client thread to proceed. This is useful if the 116 server is about to execute a blocking routine that is 117 dependent upon the client thread during its setup routine.""" 118 self.server_ready.set() 119 120 def _setUp(self): 121 self.server_ready = threading.Event() 122 self.client_ready = threading.Event() 123 self.done = threading.Event() 124 self.queue = Queue.Queue(1) 125 126 # Do some munging to start the client test. 127 methodname = self.id() 128 i = methodname.rfind('.') 129 methodname = methodname[i+1:] 130 test_method = getattr(self, '_' + methodname) 131 self.client_thread = thread.start_new_thread( 132 self.clientRun, (test_method,)) 133 134 self.__setUp() 135 if not self.server_ready.is_set(): 136 self.server_ready.set() 137 self.client_ready.wait() 138 139 def _tearDown(self): 140 self.__tearDown() 141 self.done.wait() 142 143 if not self.queue.empty(): 144 msg = self.queue.get() 145 self.fail(msg) 146 147 def clientRun(self, test_func): 148 self.server_ready.wait() 149 self.clientSetUp() 150 self.client_ready.set() 151 if not callable(test_func): 152 raise TypeError("test_func must be a callable function.") 153 try: 154 test_func() 155 except Exception, strerror: 156 self.queue.put(strerror) 157 self.clientTearDown() 158 159 def clientSetUp(self): 160 raise NotImplementedError("clientSetUp must be implemented.") 161 162 def clientTearDown(self): 163 self.done.set() 164 thread.exit() 165 166 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 167 168 def __init__(self, methodName='runTest'): 169 SocketTCPTest.__init__(self, methodName=methodName) 170 ThreadableTest.__init__(self) 171 172 def clientSetUp(self): 173 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 174 175 def clientTearDown(self): 176 self.cli.close() 177 self.cli = None 178 ThreadableTest.clientTearDown(self) 179 180 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 181 182 def __init__(self, methodName='runTest'): 183 SocketUDPTest.__init__(self, methodName=methodName) 184 ThreadableTest.__init__(self) 185 186 def clientSetUp(self): 187 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 188 189 def clientTearDown(self): 190 self.cli.close() 191 self.cli = None 192 ThreadableTest.clientTearDown(self) 193 194 class SocketConnectedTest(ThreadedTCPSocketTest): 195 196 def __init__(self, methodName='runTest'): 197 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 198 199 def setUp(self): 200 ThreadedTCPSocketTest.setUp(self) 201 # Indicate explicitly we're ready for the client thread to 202 # proceed and then perform the blocking call to accept 203 self.serverExplicitReady() 204 conn, addr = self.serv.accept() 205 self.cli_conn = conn 206 207 def tearDown(self): 208 self.cli_conn.close() 209 self.cli_conn = None 210 ThreadedTCPSocketTest.tearDown(self) 211 212 def clientSetUp(self): 213 ThreadedTCPSocketTest.clientSetUp(self) 214 self.cli.connect((HOST, self.port)) 215 self.serv_conn = self.cli 216 217 def clientTearDown(self): 218 self.serv_conn.close() 219 self.serv_conn = None 220 ThreadedTCPSocketTest.clientTearDown(self) 221 222 class SocketPairTest(unittest.TestCase, ThreadableTest): 223 224 def __init__(self, methodName='runTest'): 225 unittest.TestCase.__init__(self, methodName=methodName) 226 ThreadableTest.__init__(self) 227 228 def setUp(self): 229 self.serv, self.cli = socket.socketpair() 230 231 def tearDown(self): 232 self.serv.close() 233 self.serv = None 234 235 def clientSetUp(self): 236 pass 237 238 def clientTearDown(self): 239 self.cli.close() 240 self.cli = None 241 ThreadableTest.clientTearDown(self) 242 243 244 ####################################################################### 245 ## Begin Tests 246 247 class GeneralModuleTests(unittest.TestCase): 248 249 def test_weakref(self): 250 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 251 p = proxy(s) 252 self.assertEqual(p.fileno(), s.fileno()) 253 s.close() 254 s = None 255 try: 256 p.fileno() 257 except ReferenceError: 258 pass 259 else: 260 self.fail('Socket proxy still exists') 261 262 def testSocketError(self): 263 # Testing socket module exceptions 264 def raise_error(*args, **kwargs): 265 raise socket.error 266 def raise_herror(*args, **kwargs): 267 raise socket.herror 268 def raise_gaierror(*args, **kwargs): 269 raise socket.gaierror 270 self.assertRaises(socket.error, raise_error, 271 "Error raising socket exception.") 272 self.assertRaises(socket.error, raise_herror, 273 "Error raising socket exception.") 274 self.assertRaises(socket.error, raise_gaierror, 275 "Error raising socket exception.") 276 277 def testSendtoErrors(self): 278 # Testing that sendto doens't masks failures. See #10169. 279 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 280 self.addCleanup(s.close) 281 s.bind(('', 0)) 282 sockname = s.getsockname() 283 # 2 args 284 with self.assertRaises(UnicodeEncodeError): 285 s.sendto(u'\u2620', sockname) 286 with self.assertRaises(TypeError) as cm: 287 s.sendto(5j, sockname) 288 self.assertIn('not complex', str(cm.exception)) 289 with self.assertRaises(TypeError) as cm: 290 s.sendto('foo', None) 291 self.assertIn('not NoneType', str(cm.exception)) 292 # 3 args 293 with self.assertRaises(UnicodeEncodeError): 294 s.sendto(u'\u2620', 0, sockname) 295 with self.assertRaises(TypeError) as cm: 296 s.sendto(5j, 0, sockname) 297 self.assertIn('not complex', str(cm.exception)) 298 with self.assertRaises(TypeError) as cm: 299 s.sendto('foo', 0, None) 300 self.assertIn('not NoneType', str(cm.exception)) 301 with self.assertRaises(TypeError) as cm: 302 s.sendto('foo', 'bar', sockname) 303 self.assertIn('an integer is required', str(cm.exception)) 304 with self.assertRaises(TypeError) as cm: 305 s.sendto('foo', None, None) 306 self.assertIn('an integer is required', str(cm.exception)) 307 # wrong number of args 308 with self.assertRaises(TypeError) as cm: 309 s.sendto('foo') 310 self.assertIn('(1 given)', str(cm.exception)) 311 with self.assertRaises(TypeError) as cm: 312 s.sendto('foo', 0, sockname, 4) 313 self.assertIn('(4 given)', str(cm.exception)) 314 315 316 def testCrucialConstants(self): 317 # Testing for mission critical constants 318 socket.AF_INET 319 socket.SOCK_STREAM 320 socket.SOCK_DGRAM 321 socket.SOCK_RAW 322 socket.SOCK_RDM 323 socket.SOCK_SEQPACKET 324 socket.SOL_SOCKET 325 socket.SO_REUSEADDR 326 327 def testHostnameRes(self): 328 # Testing hostname resolution mechanisms 329 hostname = socket.gethostname() 330 try: 331 ip = socket.gethostbyname(hostname) 332 except socket.error: 333 # Probably name lookup wasn't set up right; skip this test 334 return 335 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 336 try: 337 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 338 except socket.error: 339 # Probably a similar problem as above; skip this test 340 return 341 all_host_names = [hostname, hname] + aliases 342 fqhn = socket.getfqdn(ip) 343 if not fqhn in all_host_names: 344 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 345 346 def testRefCountGetNameInfo(self): 347 # Testing reference count for getnameinfo 348 if hasattr(sys, "getrefcount"): 349 try: 350 # On some versions, this loses a reference 351 orig = sys.getrefcount(__name__) 352 socket.getnameinfo(__name__,0) 353 except TypeError: 354 self.assertEqual(sys.getrefcount(__name__), orig, 355 "socket.getnameinfo loses a reference") 356 357 def testInterpreterCrash(self): 358 # Making sure getnameinfo doesn't crash the interpreter 359 try: 360 # On some versions, this crashes the interpreter. 361 socket.getnameinfo(('x', 0, 0, 0), 0) 362 except socket.error: 363 pass 364 365 def testNtoH(self): 366 # This just checks that htons etc. are their own inverse, 367 # when looking at the lower 16 or 32 bits. 368 sizes = {socket.htonl: 32, socket.ntohl: 32, 369 socket.htons: 16, socket.ntohs: 16} 370 for func, size in sizes.items(): 371 mask = (1L<<size) - 1 372 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 373 self.assertEqual(i & mask, func(func(i&mask)) & mask) 374 375 swapped = func(mask) 376 self.assertEqual(swapped & mask, mask) 377 self.assertRaises(OverflowError, func, 1L<<34) 378 379 def testNtoHErrors(self): 380 good_values = [ 1, 2, 3, 1L, 2L, 3L ] 381 bad_values = [ -1, -2, -3, -1L, -2L, -3L ] 382 for k in good_values: 383 socket.ntohl(k) 384 socket.ntohs(k) 385 socket.htonl(k) 386 socket.htons(k) 387 for k in bad_values: 388 self.assertRaises(OverflowError, socket.ntohl, k) 389 self.assertRaises(OverflowError, socket.ntohs, k) 390 self.assertRaises(OverflowError, socket.htonl, k) 391 self.assertRaises(OverflowError, socket.htons, k) 392 393 def testGetServBy(self): 394 eq = self.assertEqual 395 # Find one service that exists, then check all the related interfaces. 396 # I've ordered this by protocols that have both a tcp and udp 397 # protocol, at least for modern Linuxes. 398 if (sys.platform.startswith('linux') or 399 sys.platform.startswith('freebsd') or 400 sys.platform.startswith('netbsd') or 401 sys.platform == 'darwin'): 402 # avoid the 'echo' service on this platform, as there is an 403 # assumption breaking non-standard port/protocol entry 404 services = ('daytime', 'qotd', 'domain') 405 else: 406 services = ('echo', 'daytime', 'domain') 407 for service in services: 408 try: 409 port = socket.getservbyname(service, 'tcp') 410 break 411 except socket.error: 412 pass 413 else: 414 raise socket.error 415 # Try same call with optional protocol omitted 416 port2 = socket.getservbyname(service) 417 eq(port, port2) 418 # Try udp, but don't barf if it doesn't exist 419 try: 420 udpport = socket.getservbyname(service, 'udp') 421 except socket.error: 422 udpport = None 423 else: 424 eq(udpport, port) 425 # Now make sure the lookup by port returns the same service name 426 eq(socket.getservbyport(port2), service) 427 eq(socket.getservbyport(port, 'tcp'), service) 428 if udpport is not None: 429 eq(socket.getservbyport(udpport, 'udp'), service) 430 # Make sure getservbyport does not accept out of range ports. 431 self.assertRaises(OverflowError, socket.getservbyport, -1) 432 self.assertRaises(OverflowError, socket.getservbyport, 65536) 433 434 def testDefaultTimeout(self): 435 # Testing default timeout 436 # The default timeout should initially be None 437 self.assertEqual(socket.getdefaulttimeout(), None) 438 s = socket.socket() 439 self.assertEqual(s.gettimeout(), None) 440 s.close() 441 442 # Set the default timeout to 10, and see if it propagates 443 socket.setdefaulttimeout(10) 444 self.assertEqual(socket.getdefaulttimeout(), 10) 445 s = socket.socket() 446 self.assertEqual(s.gettimeout(), 10) 447 s.close() 448 449 # Reset the default timeout to None, and see if it propagates 450 socket.setdefaulttimeout(None) 451 self.assertEqual(socket.getdefaulttimeout(), None) 452 s = socket.socket() 453 self.assertEqual(s.gettimeout(), None) 454 s.close() 455 456 # Check that setting it to an invalid value raises ValueError 457 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 458 459 # Check that setting it to an invalid type raises TypeError 460 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 461 462 def testIPv4_inet_aton_fourbytes(self): 463 if not hasattr(socket, 'inet_aton'): 464 return # No inet_aton, nothing to check 465 # Test that issue1008086 and issue767150 are fixed. 466 # It must return 4 bytes. 467 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) 468 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) 469 470 def testIPv4toString(self): 471 if not hasattr(socket, 'inet_pton'): 472 return # No inet_pton() on this platform 473 from socket import inet_aton as f, inet_pton, AF_INET 474 g = lambda a: inet_pton(AF_INET, a) 475 476 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) 477 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) 478 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) 479 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) 480 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) 481 482 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) 483 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) 484 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) 485 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) 486 487 def testIPv6toString(self): 488 if not hasattr(socket, 'inet_pton'): 489 return # No inet_pton() on this platform 490 try: 491 from socket import inet_pton, AF_INET6, has_ipv6 492 if not has_ipv6: 493 return 494 except ImportError: 495 return 496 f = lambda a: inet_pton(AF_INET6, a) 497 498 self.assertEqual('\x00' * 16, f('::')) 499 self.assertEqual('\x00' * 16, f('0::0')) 500 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) 501 self.assertEqual( 502 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 503 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 504 ) 505 506 def testStringToIPv4(self): 507 if not hasattr(socket, 'inet_ntop'): 508 return # No inet_ntop() on this platform 509 from socket import inet_ntoa as f, inet_ntop, AF_INET 510 g = lambda a: inet_ntop(AF_INET, a) 511 512 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) 513 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) 514 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) 515 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) 516 517 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) 518 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) 519 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) 520 521 def testStringToIPv6(self): 522 if not hasattr(socket, 'inet_ntop'): 523 return # No inet_ntop() on this platform 524 try: 525 from socket import inet_ntop, AF_INET6, has_ipv6 526 if not has_ipv6: 527 return 528 except ImportError: 529 return 530 f = lambda a: inet_ntop(AF_INET6, a) 531 532 self.assertEqual('::', f('\x00' * 16)) 533 self.assertEqual('::1', f('\x00' * 15 + '\x01')) 534 self.assertEqual( 535 'aef:b01:506:1001:ffff:9997:55:170', 536 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 537 ) 538 539 # XXX The following don't test module-level functionality... 540 541 def _get_unused_port(self, bind_address='0.0.0.0'): 542 """Use a temporary socket to elicit an unused ephemeral port. 543 544 Args: 545 bind_address: Hostname or IP address to search for a port on. 546 547 Returns: A most likely to be unused port. 548 """ 549 tempsock = socket.socket() 550 tempsock.bind((bind_address, 0)) 551 host, port = tempsock.getsockname() 552 tempsock.close() 553 return port 554 555 def testSockName(self): 556 # Testing getsockname() 557 port = self._get_unused_port() 558 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 559 self.addCleanup(sock.close) 560 sock.bind(("0.0.0.0", port)) 561 name = sock.getsockname() 562 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 563 # it reasonable to get the host's addr in addition to 0.0.0.0. 564 # At least for eCos. This is required for the S/390 to pass. 565 try: 566 my_ip_addr = socket.gethostbyname(socket.gethostname()) 567 except socket.error: 568 # Probably name lookup wasn't set up right; skip this test 569 return 570 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 571 self.assertEqual(name[1], port) 572 573 def testGetSockOpt(self): 574 # Testing getsockopt() 575 # We know a socket should start without reuse==0 576 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 577 self.addCleanup(sock.close) 578 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 579 self.assertFalse(reuse != 0, "initial mode is reuse") 580 581 def testSetSockOpt(self): 582 # Testing setsockopt() 583 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 584 self.addCleanup(sock.close) 585 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 586 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 587 self.assertFalse(reuse == 0, "failed to set reuse mode") 588 589 def testSendAfterClose(self): 590 # testing send() after close() with timeout 591 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 592 sock.settimeout(1) 593 sock.close() 594 self.assertRaises(socket.error, sock.send, "spam") 595 596 def testNewAttributes(self): 597 # testing .family, .type and .protocol 598 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 599 self.assertEqual(sock.family, socket.AF_INET) 600 self.assertEqual(sock.type, socket.SOCK_STREAM) 601 self.assertEqual(sock.proto, 0) 602 sock.close() 603 604 def test_getsockaddrarg(self): 605 host = '0.0.0.0' 606 port = self._get_unused_port(bind_address=host) 607 big_port = port + 65536 608 neg_port = port - 65536 609 sock = socket.socket() 610 try: 611 self.assertRaises(OverflowError, sock.bind, (host, big_port)) 612 self.assertRaises(OverflowError, sock.bind, (host, neg_port)) 613 sock.bind((host, port)) 614 finally: 615 sock.close() 616 617 @unittest.skipUnless(os.name == "nt", "Windows specific") 618 def test_sock_ioctl(self): 619 self.assertTrue(hasattr(socket.socket, 'ioctl')) 620 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 621 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 622 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 623 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 624 s = socket.socket() 625 self.addCleanup(s.close) 626 self.assertRaises(ValueError, s.ioctl, -1, None) 627 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 628 629 def testGetaddrinfo(self): 630 try: 631 socket.getaddrinfo('localhost', 80) 632 except socket.gaierror as err: 633 if err.errno == socket.EAI_SERVICE: 634 # see http://bugs.python.org/issue1282647 635 self.skipTest("buggy libc version") 636 raise 637 # len of every sequence is supposed to be == 5 638 for info in socket.getaddrinfo(HOST, None): 639 self.assertEqual(len(info), 5) 640 # host can be a domain name, a string representation of an 641 # IPv4/v6 address or None 642 socket.getaddrinfo('localhost', 80) 643 socket.getaddrinfo('127.0.0.1', 80) 644 socket.getaddrinfo(None, 80) 645 if SUPPORTS_IPV6: 646 socket.getaddrinfo('::1', 80) 647 # port can be a string service name such as "http", a numeric 648 # port number (int or long), or None 649 socket.getaddrinfo(HOST, "http") 650 socket.getaddrinfo(HOST, 80) 651 socket.getaddrinfo(HOST, 80L) 652 socket.getaddrinfo(HOST, None) 653 # test family and socktype filters 654 infos = socket.getaddrinfo(HOST, None, socket.AF_INET) 655 for family, _, _, _, _ in infos: 656 self.assertEqual(family, socket.AF_INET) 657 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 658 for _, socktype, _, _, _ in infos: 659 self.assertEqual(socktype, socket.SOCK_STREAM) 660 # test proto and flags arguments 661 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 662 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 663 # a server willing to support both IPv4 and IPv6 will 664 # usually do this 665 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 666 socket.AI_PASSIVE) 667 668 669 def check_sendall_interrupted(self, with_timeout): 670 # socketpair() is not stricly required, but it makes things easier. 671 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 672 self.skipTest("signal.alarm and socket.socketpair required for this test") 673 # Our signal handlers clobber the C errno by calling a math function 674 # with an invalid domain value. 675 def ok_handler(*args): 676 self.assertRaises(ValueError, math.acosh, 0) 677 def raising_handler(*args): 678 self.assertRaises(ValueError, math.acosh, 0) 679 1 // 0 680 c, s = socket.socketpair() 681 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 682 try: 683 if with_timeout: 684 # Just above the one second minimum for signal.alarm 685 c.settimeout(1.5) 686 with self.assertRaises(ZeroDivisionError): 687 signal.alarm(1) 688 c.sendall(b"x" * (1024**2)) 689 if with_timeout: 690 signal.signal(signal.SIGALRM, ok_handler) 691 signal.alarm(1) 692 self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2)) 693 finally: 694 signal.signal(signal.SIGALRM, old_alarm) 695 c.close() 696 s.close() 697 698 def test_sendall_interrupted(self): 699 self.check_sendall_interrupted(False) 700 701 def test_sendall_interrupted_with_timeout(self): 702 self.check_sendall_interrupted(True) 703 704 def test_listen_backlog(self): 705 for backlog in 0, -1: 706 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 707 srv.bind((HOST, 0)) 708 srv.listen(backlog) 709 srv.close() 710 711 # Issue 15989 712 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 713 srv.bind((HOST, 0)) 714 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 715 srv.close() 716 717 @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') 718 def test_flowinfo(self): 719 self.assertRaises(OverflowError, socket.getnameinfo, 720 ('::1',0, 0xffffffff), 0) 721 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 722 try: 723 self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) 724 finally: 725 s.close() 726 727 728 @unittest.skipUnless(thread, 'Threading required for this test.') 729 class BasicTCPTest(SocketConnectedTest): 730 731 def __init__(self, methodName='runTest'): 732 SocketConnectedTest.__init__(self, methodName=methodName) 733 734 def testRecv(self): 735 # Testing large receive over TCP 736 msg = self.cli_conn.recv(1024) 737 self.assertEqual(msg, MSG) 738 739 def _testRecv(self): 740 self.serv_conn.send(MSG) 741 742 def testOverFlowRecv(self): 743 # Testing receive in chunks over TCP 744 seg1 = self.cli_conn.recv(len(MSG) - 3) 745 seg2 = self.cli_conn.recv(1024) 746 msg = seg1 + seg2 747 self.assertEqual(msg, MSG) 748 749 def _testOverFlowRecv(self): 750 self.serv_conn.send(MSG) 751 752 def testRecvFrom(self): 753 # Testing large recvfrom() over TCP 754 msg, addr = self.cli_conn.recvfrom(1024) 755 self.assertEqual(msg, MSG) 756 757 def _testRecvFrom(self): 758 self.serv_conn.send(MSG) 759 760 def testOverFlowRecvFrom(self): 761 # Testing recvfrom() in chunks over TCP 762 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 763 seg2, addr = self.cli_conn.recvfrom(1024) 764 msg = seg1 + seg2 765 self.assertEqual(msg, MSG) 766 767 def _testOverFlowRecvFrom(self): 768 self.serv_conn.send(MSG) 769 770 def testSendAll(self): 771 # Testing sendall() with a 2048 byte string over TCP 772 msg = '' 773 while 1: 774 read = self.cli_conn.recv(1024) 775 if not read: 776 break 777 msg += read 778 self.assertEqual(msg, 'f' * 2048) 779 780 def _testSendAll(self): 781 big_chunk = 'f' * 2048 782 self.serv_conn.sendall(big_chunk) 783 784 def testFromFd(self): 785 # Testing fromfd() 786 if not hasattr(socket, "fromfd"): 787 return # On Windows, this doesn't exist 788 fd = self.cli_conn.fileno() 789 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 790 self.addCleanup(sock.close) 791 msg = sock.recv(1024) 792 self.assertEqual(msg, MSG) 793 794 def _testFromFd(self): 795 self.serv_conn.send(MSG) 796 797 def testDup(self): 798 # Testing dup() 799 sock = self.cli_conn.dup() 800 self.addCleanup(sock.close) 801 msg = sock.recv(1024) 802 self.assertEqual(msg, MSG) 803 804 def _testDup(self): 805 self.serv_conn.send(MSG) 806 807 def testShutdown(self): 808 # Testing shutdown() 809 msg = self.cli_conn.recv(1024) 810 self.assertEqual(msg, MSG) 811 # wait for _testShutdown to finish: on OS X, when the server 812 # closes the connection the client also becomes disconnected, 813 # and the client's shutdown call will fail. (Issue #4397.) 814 self.done.wait() 815 816 def _testShutdown(self): 817 self.serv_conn.send(MSG) 818 # Issue 15989 819 self.assertRaises(OverflowError, self.serv_conn.shutdown, 820 _testcapi.INT_MAX + 1) 821 self.assertRaises(OverflowError, self.serv_conn.shutdown, 822 2 + (_testcapi.UINT_MAX + 1)) 823 self.serv_conn.shutdown(2) 824 825 @unittest.skipUnless(thread, 'Threading required for this test.') 826 class BasicUDPTest(ThreadedUDPSocketTest): 827 828 def __init__(self, methodName='runTest'): 829 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 830 831 def testSendtoAndRecv(self): 832 # Testing sendto() and Recv() over UDP 833 msg = self.serv.recv(len(MSG)) 834 self.assertEqual(msg, MSG) 835 836 def _testSendtoAndRecv(self): 837 self.cli.sendto(MSG, 0, (HOST, self.port)) 838 839 def testRecvFrom(self): 840 # Testing recvfrom() over UDP 841 msg, addr = self.serv.recvfrom(len(MSG)) 842 self.assertEqual(msg, MSG) 843 844 def _testRecvFrom(self): 845 self.cli.sendto(MSG, 0, (HOST, self.port)) 846 847 def testRecvFromNegative(self): 848 # Negative lengths passed to recvfrom should give ValueError. 849 self.assertRaises(ValueError, self.serv.recvfrom, -1) 850 851 def _testRecvFromNegative(self): 852 self.cli.sendto(MSG, 0, (HOST, self.port)) 853 854 @unittest.skipUnless(thread, 'Threading required for this test.') 855 class TCPCloserTest(ThreadedTCPSocketTest): 856 857 def testClose(self): 858 conn, addr = self.serv.accept() 859 conn.close() 860 861 sd = self.cli 862 read, write, err = select.select([sd], [], [], 1.0) 863 self.assertEqual(read, [sd]) 864 self.assertEqual(sd.recv(1), '') 865 866 def _testClose(self): 867 self.cli.connect((HOST, self.port)) 868 time.sleep(1.0) 869 870 @unittest.skipUnless(thread, 'Threading required for this test.') 871 class BasicSocketPairTest(SocketPairTest): 872 873 def __init__(self, methodName='runTest'): 874 SocketPairTest.__init__(self, methodName=methodName) 875 876 def testRecv(self): 877 msg = self.serv.recv(1024) 878 self.assertEqual(msg, MSG) 879 880 def _testRecv(self): 881 self.cli.send(MSG) 882 883 def testSend(self): 884 self.serv.send(MSG) 885 886 def _testSend(self): 887 msg = self.cli.recv(1024) 888 self.assertEqual(msg, MSG) 889 890 @unittest.skipUnless(thread, 'Threading required for this test.') 891 class NonBlockingTCPTests(ThreadedTCPSocketTest): 892 893 def __init__(self, methodName='runTest'): 894 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 895 896 def testSetBlocking(self): 897 # Testing whether set blocking works 898 self.serv.setblocking(True) 899 self.assertIsNone(self.serv.gettimeout()) 900 self.serv.setblocking(False) 901 self.assertEqual(self.serv.gettimeout(), 0.0) 902 start = time.time() 903 try: 904 self.serv.accept() 905 except socket.error: 906 pass 907 end = time.time() 908 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") 909 # Issue 15989 910 if _testcapi.UINT_MAX < _testcapi.ULONG_MAX: 911 self.serv.setblocking(_testcapi.UINT_MAX + 1) 912 self.assertIsNone(self.serv.gettimeout()) 913 914 def _testSetBlocking(self): 915 pass 916 917 def testAccept(self): 918 # Testing non-blocking accept 919 self.serv.setblocking(0) 920 try: 921 conn, addr = self.serv.accept() 922 except socket.error: 923 pass 924 else: 925 self.fail("Error trying to do non-blocking accept.") 926 read, write, err = select.select([self.serv], [], []) 927 if self.serv in read: 928 conn, addr = self.serv.accept() 929 conn.close() 930 else: 931 self.fail("Error trying to do accept after select.") 932 933 def _testAccept(self): 934 time.sleep(0.1) 935 self.cli.connect((HOST, self.port)) 936 937 def testConnect(self): 938 # Testing non-blocking connect 939 conn, addr = self.serv.accept() 940 conn.close() 941 942 def _testConnect(self): 943 self.cli.settimeout(10) 944 self.cli.connect((HOST, self.port)) 945 946 def testRecv(self): 947 # Testing non-blocking recv 948 conn, addr = self.serv.accept() 949 conn.setblocking(0) 950 try: 951 msg = conn.recv(len(MSG)) 952 except socket.error: 953 pass 954 else: 955 self.fail("Error trying to do non-blocking recv.") 956 read, write, err = select.select([conn], [], []) 957 if conn in read: 958 msg = conn.recv(len(MSG)) 959 conn.close() 960 self.assertEqual(msg, MSG) 961 else: 962 self.fail("Error during select call to non-blocking socket.") 963 964 def _testRecv(self): 965 self.cli.connect((HOST, self.port)) 966 time.sleep(0.1) 967 self.cli.send(MSG) 968 969 @unittest.skipUnless(thread, 'Threading required for this test.') 970 class FileObjectClassTestCase(SocketConnectedTest): 971 972 bufsize = -1 # Use default buffer size 973 974 def __init__(self, methodName='runTest'): 975 SocketConnectedTest.__init__(self, methodName=methodName) 976 977 def setUp(self): 978 SocketConnectedTest.setUp(self) 979 self.serv_file = self.cli_conn.makefile('rb', self.bufsize) 980 981 def tearDown(self): 982 self.serv_file.close() 983 self.assertTrue(self.serv_file.closed) 984 SocketConnectedTest.tearDown(self) 985 self.serv_file = None 986 987 def clientSetUp(self): 988 SocketConnectedTest.clientSetUp(self) 989 self.cli_file = self.serv_conn.makefile('wb') 990 991 def clientTearDown(self): 992 self.cli_file.close() 993 self.assertTrue(self.cli_file.closed) 994 self.cli_file = None 995 SocketConnectedTest.clientTearDown(self) 996 997 def testSmallRead(self): 998 # Performing small file read test 999 first_seg = self.serv_file.read(len(MSG)-3) 1000 second_seg = self.serv_file.read(3) 1001 msg = first_seg + second_seg 1002 self.assertEqual(msg, MSG) 1003 1004 def _testSmallRead(self): 1005 self.cli_file.write(MSG) 1006 self.cli_file.flush() 1007 1008 def testFullRead(self): 1009 # read until EOF 1010 msg = self.serv_file.read() 1011 self.assertEqual(msg, MSG) 1012 1013 def _testFullRead(self): 1014 self.cli_file.write(MSG) 1015 self.cli_file.close() 1016 1017 def testUnbufferedRead(self): 1018 # Performing unbuffered file read test 1019 buf = '' 1020 while 1: 1021 char = self.serv_file.read(1) 1022 if not char: 1023 break 1024 buf += char 1025 self.assertEqual(buf, MSG) 1026 1027 def _testUnbufferedRead(self): 1028 self.cli_file.write(MSG) 1029 self.cli_file.flush() 1030 1031 def testReadline(self): 1032 # Performing file readline test 1033 line = self.serv_file.readline() 1034 self.assertEqual(line, MSG) 1035 1036 def _testReadline(self): 1037 self.cli_file.write(MSG) 1038 self.cli_file.flush() 1039 1040 def testReadlineAfterRead(self): 1041 a_baloo_is = self.serv_file.read(len("A baloo is")) 1042 self.assertEqual("A baloo is", a_baloo_is) 1043 _a_bear = self.serv_file.read(len(" a bear")) 1044 self.assertEqual(" a bear", _a_bear) 1045 line = self.serv_file.readline() 1046 self.assertEqual("\n", line) 1047 line = self.serv_file.readline() 1048 self.assertEqual("A BALOO IS A BEAR.\n", line) 1049 line = self.serv_file.readline() 1050 self.assertEqual(MSG, line) 1051 1052 def _testReadlineAfterRead(self): 1053 self.cli_file.write("A baloo is a bear\n") 1054 self.cli_file.write("A BALOO IS A BEAR.\n") 1055 self.cli_file.write(MSG) 1056 self.cli_file.flush() 1057 1058 def testReadlineAfterReadNoNewline(self): 1059 end_of_ = self.serv_file.read(len("End Of ")) 1060 self.assertEqual("End Of ", end_of_) 1061 line = self.serv_file.readline() 1062 self.assertEqual("Line", line) 1063 1064 def _testReadlineAfterReadNoNewline(self): 1065 self.cli_file.write("End Of Line") 1066 1067 def testClosedAttr(self): 1068 self.assertTrue(not self.serv_file.closed) 1069 1070 def _testClosedAttr(self): 1071 self.assertTrue(not self.cli_file.closed) 1072 1073 1074 class FileObjectInterruptedTestCase(unittest.TestCase): 1075 """Test that the file object correctly handles EINTR internally.""" 1076 1077 class MockSocket(object): 1078 def __init__(self, recv_funcs=()): 1079 # A generator that returns callables that we'll call for each 1080 # call to recv(). 1081 self._recv_step = iter(recv_funcs) 1082 1083 def recv(self, size): 1084 return self._recv_step.next()() 1085 1086 @staticmethod 1087 def _raise_eintr(): 1088 raise socket.error(errno.EINTR) 1089 1090 def _test_readline(self, size=-1, **kwargs): 1091 mock_sock = self.MockSocket(recv_funcs=[ 1092 lambda : "This is the first line\nAnd the sec", 1093 self._raise_eintr, 1094 lambda : "ond line is here\n", 1095 lambda : "", 1096 ]) 1097 fo = socket._fileobject(mock_sock, **kwargs) 1098 self.assertEqual(fo.readline(size), "This is the first line\n") 1099 self.assertEqual(fo.readline(size), "And the second line is here\n") 1100 1101 def _test_read(self, size=-1, **kwargs): 1102 mock_sock = self.MockSocket(recv_funcs=[ 1103 lambda : "This is the first line\nAnd the sec", 1104 self._raise_eintr, 1105 lambda : "ond line is here\n", 1106 lambda : "", 1107 ]) 1108 fo = socket._fileobject(mock_sock, **kwargs) 1109 self.assertEqual(fo.read(size), "This is the first line\n" 1110 "And the second line is here\n") 1111 1112 def test_default(self): 1113 self._test_readline() 1114 self._test_readline(size=100) 1115 self._test_read() 1116 self._test_read(size=100) 1117 1118 def test_with_1k_buffer(self): 1119 self._test_readline(bufsize=1024) 1120 self._test_readline(size=100, bufsize=1024) 1121 self._test_read(bufsize=1024) 1122 self._test_read(size=100, bufsize=1024) 1123 1124 def _test_readline_no_buffer(self, size=-1): 1125 mock_sock = self.MockSocket(recv_funcs=[ 1126 lambda : "aa", 1127 lambda : "\n", 1128 lambda : "BB", 1129 self._raise_eintr, 1130 lambda : "bb", 1131 lambda : "", 1132 ]) 1133 fo = socket._fileobject(mock_sock, bufsize=0) 1134 self.assertEqual(fo.readline(size), "aa\n") 1135 self.assertEqual(fo.readline(size), "BBbb") 1136 1137 def test_no_buffer(self): 1138 self._test_readline_no_buffer() 1139 self._test_readline_no_buffer(size=4) 1140 self._test_read(bufsize=0) 1141 self._test_read(size=100, bufsize=0) 1142 1143 1144 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 1145 1146 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 1147 1148 In this case (and in this case only), it should be possible to 1149 create a file object, read a line from it, create another file 1150 object, read another line from it, without loss of data in the 1151 first file object's buffer. Note that httplib relies on this 1152 when reading multiple requests from the same socket.""" 1153 1154 bufsize = 0 # Use unbuffered mode 1155 1156 def testUnbufferedReadline(self): 1157 # Read a line, create a new file object, read another line with it 1158 line = self.serv_file.readline() # first line 1159 self.assertEqual(line, "A. " + MSG) # first line 1160 self.serv_file = self.cli_conn.makefile('rb', 0) 1161 line = self.serv_file.readline() # second line 1162 self.assertEqual(line, "B. " + MSG) # second line 1163 1164 def _testUnbufferedReadline(self): 1165 self.cli_file.write("A. " + MSG) 1166 self.cli_file.write("B. " + MSG) 1167 self.cli_file.flush() 1168 1169 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 1170 1171 bufsize = 1 # Default-buffered for reading; line-buffered for writing 1172 1173 class SocketMemo(object): 1174 """A wrapper to keep track of sent data, needed to examine write behaviour""" 1175 def __init__(self, sock): 1176 self._sock = sock 1177 self.sent = [] 1178 1179 def send(self, data, flags=0): 1180 n = self._sock.send(data, flags) 1181 self.sent.append(data[:n]) 1182 return n 1183 1184 def sendall(self, data, flags=0): 1185 self._sock.sendall(data, flags) 1186 self.sent.append(data) 1187 1188 def __getattr__(self, attr): 1189 return getattr(self._sock, attr) 1190 1191 def getsent(self): 1192 return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] 1193 1194 def setUp(self): 1195 FileObjectClassTestCase.setUp(self) 1196 self.serv_file._sock = self.SocketMemo(self.serv_file._sock) 1197 1198 def testLinebufferedWrite(self): 1199 # Write two lines, in small chunks 1200 msg = MSG.strip() 1201 print >> self.serv_file, msg, 1202 print >> self.serv_file, msg 1203 1204 # second line: 1205 print >> self.serv_file, msg, 1206 print >> self.serv_file, msg, 1207 print >> self.serv_file, msg 1208 1209 # third line 1210 print >> self.serv_file, '' 1211 1212 self.serv_file.flush() 1213 1214 msg1 = "%s %s\n"%(msg, msg) 1215 msg2 = "%s %s %s\n"%(msg, msg, msg) 1216 msg3 = "\n" 1217 self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) 1218 1219 def _testLinebufferedWrite(self): 1220 msg = MSG.strip() 1221 msg1 = "%s %s\n"%(msg, msg) 1222 msg2 = "%s %s %s\n"%(msg, msg, msg) 1223 msg3 = "\n" 1224 l1 = self.cli_file.readline() 1225 self.assertEqual(l1, msg1) 1226 l2 = self.cli_file.readline() 1227 self.assertEqual(l2, msg2) 1228 l3 = self.cli_file.readline() 1229 self.assertEqual(l3, msg3) 1230 1231 1232 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 1233 1234 bufsize = 2 # Exercise the buffering code 1235 1236 1237 class NetworkConnectionTest(object): 1238 """Prove network connection.""" 1239 def clientSetUp(self): 1240 # We're inherited below by BasicTCPTest2, which also inherits 1241 # BasicTCPTest, which defines self.port referenced below. 1242 self.cli = socket.create_connection((HOST, self.port)) 1243 self.serv_conn = self.cli 1244 1245 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 1246 """Tests that NetworkConnection does not break existing TCP functionality. 1247 """ 1248 1249 class NetworkConnectionNoServer(unittest.TestCase): 1250 class MockSocket(socket.socket): 1251 def connect(self, *args): 1252 raise socket.timeout('timed out') 1253 1254 @contextlib.contextmanager 1255 def mocked_socket_module(self): 1256 """Return a socket which times out on connect""" 1257 old_socket = socket.socket 1258 socket.socket = self.MockSocket 1259 try: 1260 yield 1261 finally: 1262 socket.socket = old_socket 1263 1264 def test_connect(self): 1265 port = test_support.find_unused_port() 1266 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1267 self.addCleanup(cli.close) 1268 with self.assertRaises(socket.error) as cm: 1269 cli.connect((HOST, port)) 1270 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 1271 1272 def test_create_connection(self): 1273 # Issue #9792: errors raised by create_connection() should have 1274 # a proper errno attribute. 1275 port = test_support.find_unused_port() 1276 with self.assertRaises(socket.error) as cm: 1277 socket.create_connection((HOST, port)) 1278 1279 # Issue #16257: create_connection() calls getaddrinfo() against 1280 # 'localhost'. This may result in an IPV6 addr being returned 1281 # as well as an IPV4 one: 1282 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 1283 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 1284 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 1285 # 1286 # create_connection() enumerates through all the addresses returned 1287 # and if it doesn't successfully bind to any of them, it propagates 1288 # the last exception it encountered. 1289 # 1290 # On Solaris, ENETUNREACH is returned in this circumstance instead 1291 # of ECONNREFUSED. So, if that errno exists, add it to our list of 1292 # expected errnos. 1293 expected_errnos = [ errno.ECONNREFUSED, ] 1294 if hasattr(errno, 'ENETUNREACH'): 1295 expected_errnos.append(errno.ENETUNREACH) 1296 1297 self.assertIn(cm.exception.errno, expected_errnos) 1298 1299 def test_create_connection_timeout(self): 1300 # Issue #9792: create_connection() should not recast timeout errors 1301 # as generic socket errors. 1302 with self.mocked_socket_module(): 1303 with self.assertRaises(socket.timeout): 1304 socket.create_connection((HOST, 1234)) 1305 1306 1307 @unittest.skipUnless(thread, 'Threading required for this test.') 1308 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 1309 1310 def __init__(self, methodName='runTest'): 1311 SocketTCPTest.__init__(self, methodName=methodName) 1312 ThreadableTest.__init__(self) 1313 1314 def clientSetUp(self): 1315 self.source_port = test_support.find_unused_port() 1316 1317 def clientTearDown(self): 1318 self.cli.close() 1319 self.cli = None 1320 ThreadableTest.clientTearDown(self) 1321 1322 def _justAccept(self): 1323 conn, addr = self.serv.accept() 1324 conn.close() 1325 1326 testFamily = _justAccept 1327 def _testFamily(self): 1328 self.cli = socket.create_connection((HOST, self.port), timeout=30) 1329 self.addCleanup(self.cli.close) 1330 self.assertEqual(self.cli.family, 2) 1331 1332 testSourceAddress = _justAccept 1333 def _testSourceAddress(self): 1334 self.cli = socket.create_connection((HOST, self.port), timeout=30, 1335 source_address=('', self.source_port)) 1336 self.addCleanup(self.cli.close) 1337 self.assertEqual(self.cli.getsockname()[1], self.source_port) 1338 # The port number being used is sufficient to show that the bind() 1339 # call happened. 1340 1341 testTimeoutDefault = _justAccept 1342 def _testTimeoutDefault(self): 1343 # passing no explicit timeout uses socket's global default 1344 self.assertTrue(socket.getdefaulttimeout() is None) 1345 socket.setdefaulttimeout(42) 1346 try: 1347 self.cli = socket.create_connection((HOST, self.port)) 1348 self.addCleanup(self.cli.close) 1349 finally: 1350 socket.setdefaulttimeout(None) 1351 self.assertEqual(self.cli.gettimeout(), 42) 1352 1353 testTimeoutNone = _justAccept 1354 def _testTimeoutNone(self): 1355 # None timeout means the same as sock.settimeout(None) 1356 self.assertTrue(socket.getdefaulttimeout() is None) 1357 socket.setdefaulttimeout(30) 1358 try: 1359 self.cli = socket.create_connection((HOST, self.port), timeout=None) 1360 self.addCleanup(self.cli.close) 1361 finally: 1362 socket.setdefaulttimeout(None) 1363 self.assertEqual(self.cli.gettimeout(), None) 1364 1365 testTimeoutValueNamed = _justAccept 1366 def _testTimeoutValueNamed(self): 1367 self.cli = socket.create_connection((HOST, self.port), timeout=30) 1368 self.assertEqual(self.cli.gettimeout(), 30) 1369 1370 testTimeoutValueNonamed = _justAccept 1371 def _testTimeoutValueNonamed(self): 1372 self.cli = socket.create_connection((HOST, self.port), 30) 1373 self.addCleanup(self.cli.close) 1374 self.assertEqual(self.cli.gettimeout(), 30) 1375 1376 @unittest.skipUnless(thread, 'Threading required for this test.') 1377 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 1378 1379 def __init__(self, methodName='runTest'): 1380 SocketTCPTest.__init__(self, methodName=methodName) 1381 ThreadableTest.__init__(self) 1382 1383 def clientSetUp(self): 1384 pass 1385 1386 def clientTearDown(self): 1387 self.cli.close() 1388 self.cli = None 1389 ThreadableTest.clientTearDown(self) 1390 1391 def testInsideTimeout(self): 1392 conn, addr = self.serv.accept() 1393 self.addCleanup(conn.close) 1394 time.sleep(3) 1395 conn.send("done!") 1396 testOutsideTimeout = testInsideTimeout 1397 1398 def _testInsideTimeout(self): 1399 self.cli = sock = socket.create_connection((HOST, self.port)) 1400 data = sock.recv(5) 1401 self.assertEqual(data, "done!") 1402 1403 def _testOutsideTimeout(self): 1404 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 1405 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 1406 1407 1408 class Urllib2FileobjectTest(unittest.TestCase): 1409 1410 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that 1411 # it close the socket if the close c'tor argument is true 1412 1413 def testClose(self): 1414 class MockSocket: 1415 closed = False 1416 def flush(self): pass 1417 def close(self): self.closed = True 1418 1419 # must not close unless we request it: the original use of _fileobject 1420 # by module socket requires that the underlying socket not be closed until 1421 # the _socketobject that created the _fileobject is closed 1422 s = MockSocket() 1423 f = socket._fileobject(s) 1424 f.close() 1425 self.assertTrue(not s.closed) 1426 1427 s = MockSocket() 1428 f = socket._fileobject(s, close=True) 1429 f.close() 1430 self.assertTrue(s.closed) 1431 1432 class TCPTimeoutTest(SocketTCPTest): 1433 1434 def testTCPTimeout(self): 1435 def raise_timeout(*args, **kwargs): 1436 self.serv.settimeout(1.0) 1437 self.serv.accept() 1438 self.assertRaises(socket.timeout, raise_timeout, 1439 "Error generating a timeout exception (TCP)") 1440 1441 def testTimeoutZero(self): 1442 ok = False 1443 try: 1444 self.serv.settimeout(0.0) 1445 foo = self.serv.accept() 1446 except socket.timeout: 1447 self.fail("caught timeout instead of error (TCP)") 1448 except socket.error: 1449 ok = True 1450 except: 1451 self.fail("caught unexpected exception (TCP)") 1452 if not ok: 1453 self.fail("accept() returned success when we did not expect it") 1454 1455 def testInterruptedTimeout(self): 1456 # XXX I don't know how to do this test on MSWindows or any other 1457 # plaform that doesn't support signal.alarm() or os.kill(), though 1458 # the bug should have existed on all platforms. 1459 if not hasattr(signal, "alarm"): 1460 return # can only test on *nix 1461 self.serv.settimeout(5.0) # must be longer than alarm 1462 class Alarm(Exception): 1463 pass 1464 def alarm_handler(signal, frame): 1465 raise Alarm 1466 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 1467 try: 1468 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 1469 try: 1470 foo = self.serv.accept() 1471 except socket.timeout: 1472 self.fail("caught timeout instead of Alarm") 1473 except Alarm: 1474 pass 1475 except: 1476 self.fail("caught other exception instead of Alarm:" 1477 " %s(%s):\n%s" % 1478 (sys.exc_info()[:2] + (traceback.format_exc(),))) 1479 else: 1480 self.fail("nothing caught") 1481 finally: 1482 signal.alarm(0) # shut off alarm 1483 except Alarm: 1484 self.fail("got Alarm in wrong place") 1485 finally: 1486 # no alarm can be pending. Safe to restore old handler. 1487 signal.signal(signal.SIGALRM, old_alarm) 1488 1489 class UDPTimeoutTest(SocketUDPTest): 1490 1491 def testUDPTimeout(self): 1492 def raise_timeout(*args, **kwargs): 1493 self.serv.settimeout(1.0) 1494 self.serv.recv(1024) 1495 self.assertRaises(socket.timeout, raise_timeout, 1496 "Error generating a timeout exception (UDP)") 1497 1498 def testTimeoutZero(self): 1499 ok = False 1500 try: 1501 self.serv.settimeout(0.0) 1502 foo = self.serv.recv(1024) 1503 except socket.timeout: 1504 self.fail("caught timeout instead of error (UDP)") 1505 except socket.error: 1506 ok = True 1507 except: 1508 self.fail("caught unexpected exception (UDP)") 1509 if not ok: 1510 self.fail("recv() returned success when we did not expect it") 1511 1512 class TestExceptions(unittest.TestCase): 1513 1514 def testExceptionTree(self): 1515 self.assertTrue(issubclass(socket.error, Exception)) 1516 self.assertTrue(issubclass(socket.herror, socket.error)) 1517 self.assertTrue(issubclass(socket.gaierror, socket.error)) 1518 self.assertTrue(issubclass(socket.timeout, socket.error)) 1519 1520 class TestLinuxAbstractNamespace(unittest.TestCase): 1521 1522 UNIX_PATH_MAX = 108 1523 1524 def testLinuxAbstractNamespace(self): 1525 address = "\x00python-test-hello\x00\xff" 1526 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1527 s1.bind(address) 1528 s1.listen(1) 1529 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1530 s2.connect(s1.getsockname()) 1531 s1.accept() 1532 self.assertEqual(s1.getsockname(), address) 1533 self.assertEqual(s2.getpeername(), address) 1534 1535 def testMaxName(self): 1536 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) 1537 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1538 s.bind(address) 1539 self.assertEqual(s.getsockname(), address) 1540 1541 def testNameOverflow(self): 1542 address = "\x00" + "h" * self.UNIX_PATH_MAX 1543 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1544 self.assertRaises(socket.error, s.bind, address) 1545 1546 1547 @unittest.skipUnless(thread, 'Threading required for this test.') 1548 class BufferIOTest(SocketConnectedTest): 1549 """ 1550 Test the buffer versions of socket.recv() and socket.send(). 1551 """ 1552 def __init__(self, methodName='runTest'): 1553 SocketConnectedTest.__init__(self, methodName=methodName) 1554 1555 def testRecvIntoArray(self): 1556 buf = array.array('c', ' '*1024) 1557 nbytes = self.cli_conn.recv_into(buf) 1558 self.assertEqual(nbytes, len(MSG)) 1559 msg = buf.tostring()[:len(MSG)] 1560 self.assertEqual(msg, MSG) 1561 1562 def _testRecvIntoArray(self): 1563 with test_support.check_py3k_warnings(): 1564 buf = buffer(MSG) 1565 self.serv_conn.send(buf) 1566 1567 def testRecvIntoBytearray(self): 1568 buf = bytearray(1024) 1569 nbytes = self.cli_conn.recv_into(buf) 1570 self.assertEqual(nbytes, len(MSG)) 1571 msg = buf[:len(MSG)] 1572 self.assertEqual(msg, MSG) 1573 1574 _testRecvIntoBytearray = _testRecvIntoArray 1575 1576 def testRecvIntoMemoryview(self): 1577 buf = bytearray(1024) 1578 nbytes = self.cli_conn.recv_into(memoryview(buf)) 1579 self.assertEqual(nbytes, len(MSG)) 1580 msg = buf[:len(MSG)] 1581 self.assertEqual(msg, MSG) 1582 1583 _testRecvIntoMemoryview = _testRecvIntoArray 1584 1585 def testRecvFromIntoArray(self): 1586 buf = array.array('c', ' '*1024) 1587 nbytes, addr = self.cli_conn.recvfrom_into(buf) 1588 self.assertEqual(nbytes, len(MSG)) 1589 msg = buf.tostring()[:len(MSG)] 1590 self.assertEqual(msg, MSG) 1591 1592 def _testRecvFromIntoArray(self): 1593 with test_support.check_py3k_warnings(): 1594 buf = buffer(MSG) 1595 self.serv_conn.send(buf) 1596 1597 def testRecvFromIntoBytearray(self): 1598 buf = bytearray(1024) 1599 nbytes, addr = self.cli_conn.recvfrom_into(buf) 1600 self.assertEqual(nbytes, len(MSG)) 1601 msg = buf[:len(MSG)] 1602 self.assertEqual(msg, MSG) 1603 1604 _testRecvFromIntoBytearray = _testRecvFromIntoArray 1605 1606 def testRecvFromIntoMemoryview(self): 1607 buf = bytearray(1024) 1608 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 1609 self.assertEqual(nbytes, len(MSG)) 1610 msg = buf[:len(MSG)] 1611 self.assertEqual(msg, MSG) 1612 1613 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 1614 1615 1616 TIPC_STYPE = 2000 1617 TIPC_LOWER = 200 1618 TIPC_UPPER = 210 1619 1620 def isTipcAvailable(): 1621 """Check if the TIPC module is loaded 1622 1623 The TIPC module is not loaded automatically on Ubuntu and probably 1624 other Linux distros. 1625 """ 1626 if not hasattr(socket, "AF_TIPC"): 1627 return False 1628 if not os.path.isfile("/proc/modules"): 1629 return False 1630 with open("/proc/modules") as f: 1631 for line in f: 1632 if line.startswith("tipc "): 1633 return True 1634 if test_support.verbose: 1635 print "TIPC module is not loaded, please 'sudo modprobe tipc'" 1636 return False 1637 1638 class TIPCTest (unittest.TestCase): 1639 def testRDM(self): 1640 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 1641 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 1642 1643 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1644 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 1645 TIPC_LOWER, TIPC_UPPER) 1646 srv.bind(srvaddr) 1647 1648 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 1649 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) 1650 cli.sendto(MSG, sendaddr) 1651 1652 msg, recvaddr = srv.recvfrom(1024) 1653 1654 self.assertEqual(cli.getsockname(), recvaddr) 1655 self.assertEqual(msg, MSG) 1656 1657 1658 class TIPCThreadableTest (unittest.TestCase, ThreadableTest): 1659 def __init__(self, methodName = 'runTest'): 1660 unittest.TestCase.__init__(self, methodName = methodName) 1661 ThreadableTest.__init__(self) 1662 1663 def setUp(self): 1664 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 1665 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1666 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 1667 TIPC_LOWER, TIPC_UPPER) 1668 self.srv.bind(srvaddr) 1669 self.srv.listen(5) 1670 self.serverExplicitReady() 1671 self.conn, self.connaddr = self.srv.accept() 1672 1673 def clientSetUp(self): 1674 # The is a hittable race between serverExplicitReady() and the 1675 # accept() call; sleep a little while to avoid it, otherwise 1676 # we could get an exception 1677 time.sleep(0.1) 1678 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 1679 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 1680 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) 1681 self.cli.connect(addr) 1682 self.cliaddr = self.cli.getsockname() 1683 1684 def testStream(self): 1685 msg = self.conn.recv(1024) 1686 self.assertEqual(msg, MSG) 1687 self.assertEqual(self.cliaddr, self.connaddr) 1688 1689 def _testStream(self): 1690 self.cli.send(MSG) 1691 self.cli.close() 1692 1693 1694 def test_main(): 1695 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 1696 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, 1697 UDPTimeoutTest ] 1698 1699 tests.extend([ 1700 NonBlockingTCPTests, 1701 FileObjectClassTestCase, 1702 FileObjectInterruptedTestCase, 1703 UnbufferedFileObjectClassTestCase, 1704 LineBufferedFileObjectClassTestCase, 1705 SmallBufferedFileObjectClassTestCase, 1706 Urllib2FileobjectTest, 1707 NetworkConnectionNoServer, 1708 NetworkConnectionAttributesTest, 1709 NetworkConnectionBehaviourTest, 1710 ]) 1711 if hasattr(socket, "socketpair"): 1712 tests.append(BasicSocketPairTest) 1713 if sys.platform == 'linux2': 1714 tests.append(TestLinuxAbstractNamespace) 1715 if isTipcAvailable(): 1716 tests.append(TIPCTest) 1717 tests.append(TIPCThreadableTest) 1718 1719 thread_info = test_support.threading_setup() 1720 test_support.run_unittest(*tests) 1721 test_support.threading_cleanup(*thread_info) 1722 1723 if __name__ == "__main__": 1724 test_main() 1725