Home | History | Annotate | Download | only in test

Lines Matching refs:socket

6 import socket
24 def try_address(host, port=0, family=socket.AF_INET):
25 """Try to bind a socket on the given host:port and return True
28 sock = socket.socket(family, socket.SOCK_STREAM)
30 except (socket.error, socket.gaierror):
38 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
53 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
176 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
190 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
232 self.serv, self.cli = socket.socketpair()
254 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
256 expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>'
261 expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>'
266 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
276 self.fail('Socket proxy still exists')
279 s = socket.socket()._sock
287 # Testing socket module exceptions
289 raise socket.error
291 raise socket.herror
293 raise socket.gaierror
294 self.assertRaises(socket.error, raise_error,
295 "Error raising socket exception.")
296 self.assertRaises(socket.error, raise_herror,
297 "Error raising socket exception.")
298 self.assertRaises(socket.error, raise_gaierror,
299 "Error raising socket exception.")
303 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
342 socket.AF_INET
343 socket.SOCK_STREAM
344 socket.SOCK_DGRAM
345 socket.SOCK_RAW
346 socket.SOCK_RDM
347 socket.SOCK_SEQPACKET
348 socket.SOL_SOCKET
349 socket.SO_REUSEADDR
353 hostname = socket.gethostname()
355 ip = socket.gethostbyname(hostname)
356 except socket.error:
361 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
362 except socket.error:
366 fqhn = socket.getfqdn(ip)
377 socket.getnameinfo(__name__,0)
380 "socket.getnameinfo loses a reference")
386 socket.getnameinfo(('x', 0, 0, 0), 0)
387 except socket.error:
393 sizes = {socket.htonl: 32, socket.ntohl: 32,
394 socket.htons: 16, socket.ntohs: 16}
408 socket.ntohl(k)
409 socket.ntohs(k)
410 socket.htonl(k)
411 socket.htons(k)
413 self.assertRaises(OverflowError, socket.ntohl, k)
414 self.assertRaises(OverflowError, socket.ntohs, k)
415 self.assertRaises(OverflowError, socket.htonl, k)
416 self.assertRaises(OverflowError, socket.htons, k)
434 port = socket.getservbyname(service, 'tcp')
436 except socket.error:
439 raise socket.error
441 port2 = socket.getservbyname(service)
445 udpport = socket.getservbyname(service, 'udp')
446 except socket.error:
451 eq(socket.getservbyport(port2), service)
452 eq(socket.getservbyport(port, 'tcp'), service)
454 eq(socket.getservbyport(udpport, 'udp'), service)
456 self.assertRaises(OverflowError, socket.getservbyport, -1)
457 self.assertRaises(OverflowError, socket.getservbyport, 65536)
462 self.assertEqual(socket.getdefaulttimeout(), None)
463 s = socket.socket()
468 socket.setdefaulttimeout(10)
469 self.assertEqual(socket.getdefaulttimeout(), 10)
470 s = socket.socket()
475 socket.setdefaulttimeout(None)
476 self.assertEqual(socket.getdefaulttimeout(), None)
477 s = socket.socket()
482 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
485 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
487 @unittest.skipUnless(hasattr(socket, 'inet_aton'),
488 'test needs socket.inet_aton()')
492 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
493 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
495 @unittest.skipUnless(hasattr(socket, 'inet_pton'),
496 'test needs socket.inet_pton()')
498 from socket import inet_aton as f, inet_pton, AF_INET
512 @unittest.skipUnless(hasattr(socket, 'inet_pton'),
513 'test needs socket.inet_pton()')
516 from socket import inet_pton, AF_INET6, has_ipv6
520 self.skipTest('could not import needed symbols from socket')
531 @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
532 'test needs socket.inet_ntop()')
534 from socket import inet_ntoa as f, inet_ntop, AF_INET
546 @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
547 'test needs socket.inet_ntop()')
550 from socket import inet_ntop, AF_INET6, has_ipv6
554 self.skipTest('could not import needed symbols from socket')
567 """Use a temporary socket to elicit an unused ephemeral port.
574 tempsock = socket.socket()
583 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
591 my_ip_addr = socket.gethostbyname(socket.gethostname())
592 except socket.error:
600 # We know a socket should start without reuse==0
601 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
603 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
608 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
610 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
611 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
616 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
619 self.assertRaises(socket.error, sock.send, "spam")
623 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
624 self.assertEqual(sock.family, socket.AF_INET)
625 self.assertEqual(sock.type, socket.SOCK_STREAM)
630 sock = socket.socket()
651 self.assertTrue(hasattr(socket.socket, 'ioctl'))
652 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
653 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
654 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
655 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
656 s = socket.socket()
659 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
663 socket.getaddrinfo('localhost', 80)
664 except socket.gaierror as err:
665 if err.errno == socket.EAI_SERVICE:
670 for info in socket.getaddrinfo(HOST, None):
674 socket.getaddrinfo('localhost', 80)
675 socket.getaddrinfo('127.0.0.1', 80)
676 socket.getaddrinfo(None, 80)
678 socket.getaddrinfo('::1', 80)
681 socket.getaddrinfo(HOST, "http")
682 socket.getaddrinfo(HOST, 80)
683 socket.getaddrinfo(HOST, 80L)
684 socket.getaddrinfo(HOST, None)
686 infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
688 self.assertEqual(family, socket.AF_INET)
689 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
691 self.assertEqual(socktype, socket.SOCK_STREAM)
693 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
694 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
697 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
698 socket.AI_PASSIVE)
701 if hasattr(socket, 'AI_NUMERICSERV'):
705 socket.getaddrinfo("localhost", None, 0, 0, 0,
706 socket.AI_NUMERICSERV)
707 except socket.gaierror:
712 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
713 self.skipTest("signal.alarm and socket.socketpair required for this test")
721 c, s = socket.socketpair()
733 self.assertRaises(socket.timeout, c.sendall,
748 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
757 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
764 self.assertRaises(OverflowError, socket.getnameinfo,
766 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
829 @unittest.skipUnless(hasattr(socket, 'fromfd'),
830 'socket.fromfd not available')
834 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
923 @unittest.skipUnless(hasattr(socket, 'socketpair'),
924 'test needs socket.socketpair()')
960 except socket.error:
986 except socket.error:
1016 except socket.error:
1026 self.fail("Error during select call to non-blocking socket.")
1152 raise socket.error(errno.EINTR)
1161 fo = socket._fileobject(mock_sock, **kwargs)
1172 fo = socket._fileobject(mock_sock, **kwargs)
1197 fo = socket._fileobject(mock_sock, bufsize=0)
1216 when reading multiple requests from the same socket."""
1306 self.cli = socket.create_connection((HOST, self.port))
1314 class MockSocket(socket.socket):
1316 raise socket.timeout('timed out')
1320 """Return a socket which times out on connect"""
1321 old_socket = socket.socket
1322 socket.socket = self.MockSocket
1326 socket.socket = old_socket
1330 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1332 with self.assertRaises(socket.error) as cm:
1340 with self.assertRaises(socket.error) as cm:
1341 socket.create_connection((HOST, port))
1346 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
1365 # as generic socket errors.
1367 with self.assertRaises(socket.timeout):
1368 socket.create_connection((HOST, 1234))
1392 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1398 self.cli = socket.create_connection((HOST, self.port), timeout=30,
1407 # passing no explicit timeout uses socket's global default
1408 self.assertTrue(socket.getdefaulttimeout() is None)
1409 socket.setdefaulttimeout(42)
1411 self.cli = socket.create_connection((HOST, self.port))
1414 socket.setdefaulttimeout(None)
1420 self.assertTrue(socket.getdefaulttimeout() is None)
1421 socket.setdefaulttimeout(30)
1423 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1426 socket.setdefaulttimeout(None)
1431 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1436 self.cli = socket.create_connection((HOST, self.port), 30)
1463 self.cli = sock = socket.create_connection((HOST, self.port))
1468 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1469 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1474 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1475 # it close the socket if the close c'tor argument is true
1484 # by module socket requires that the underlying socket not be closed until
1487 f = socket._fileobject(s)
1492 f = socket._fileobject(s, close=True)
1502 self.assertRaises(socket.timeout, raise_timeout,
1510 except socket.timeout:
1512 except socket.error:
1535 except socket.timeout:
1559 self.assertRaises(socket.timeout, raise_timeout,
1567 except socket.timeout:
1569 except socket.error:
1579 self.assertTrue(issubclass(socket.error, Exception))
1580 self.assertTrue(issubclass(socket.herror, socket.error))
1581 self.assertTrue(issubclass(socket.gaierror, socket.error))
1582 self.assertTrue(issubclass(socket.timeout, socket.error))
1591 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1594 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1602 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1608 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1609 self.assertRaises(socket.error, s.bind, address)
1615 Test the buffer versions of socket.recv() and socket.send().
1708 if not hasattr(socket, "AF_TIPC"):
1722 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1723 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1725 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1726 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1730 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1748 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1749 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1750 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1762 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1763 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,