Home | History | Annotate | Download | only in cros
      1 #!/usr/bin/python
      2 
      3 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 import logging
      8 import socket
      9 import sys
     10 import time
     11 
     12 import common
     13 
     14 from autotest_lib.client.cros import dhcp_handling_rule
     15 from autotest_lib.client.cros import dhcp_packet
     16 from autotest_lib.client.cros import dhcp_test_server
     17 
     18 TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
     19 
     20 TEST_CLASSLESS_STATIC_ROUTE_DATA = \
     21         "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \
     22         "\x00\xc0\xa8\x00\xfe"
     23 
     24 TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [
     25         (18, "10.9.192.0", "172.31.155.10"),
     26         (0, "0.0.0.0", "192.168.0.254")
     27         ]
     28 
     29 TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \
     30         "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04"
     31 
     32 TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com")
     33 
     34 # At this time, we don't support the compression allowed in the RFC.
     35 # This is correct and sufficient for our purposes.
     36 TEST_DOMAIN_SEARCH_LIST_EXPECTED = \
     37         "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00"
     38 
     39 def bin2hex(byte_str, justification=20):
     40     """
     41     Turn big hex strings into prettier strings of hex bytes.  Group those hex
     42     bytes into lines justification bytes long.
     43     """
     44     chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
     45     groups = []
     46     for i in xrange(0, len(chars), justification):
     47         groups.append("".join(chars[i:i+justification]))
     48     return "\n".join(groups)
     49 
     50 def test_packet_serialization():
     51     log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
     52     binary_discovery_packet = log_file.read()
     53     log_file.close()
     54     discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
     55     if not discovery_packet.is_valid:
     56         return False
     57     generated_string = discovery_packet.to_binary_string()
     58     if generated_string is None:
     59         print "Failed to generate string from packet object."
     60         return False
     61     if generated_string != binary_discovery_packet:
     62         print "Packets didn't match: "
     63         print "Generated: \n%s" % bin2hex(generated_string)
     64         print "Expected: \n%s" % bin2hex(binary_discovery_packet)
     65         return False
     66     print "test_packet_serialization PASSED"
     67     return True
     68 
     69 def test_classless_static_route_parsing():
     70     parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack(
     71             TEST_CLASSLESS_STATIC_ROUTE_DATA)
     72     if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED:
     73         print ("Parsed binary domain list and got %s but expected %s" %
     74                (repr(parsed_routes),
     75                 repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)))
     76         return False
     77     print "test_classless_static_route_parsing PASSED"
     78     return True
     79 
     80 def test_classless_static_route_serialization():
     81     byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack(
     82             TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)
     83     if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA:
     84         # Turn the strings into printable hex strings on a single line.
     85         pretty_actual = bin2hex(byte_string, 100)
     86         pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100)
     87         print ("Expected to serialize %s to %s but instead got %s." %
     88                (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected,
     89                      pretty_actual))
     90         return False
     91     print "test_classless_static_route_serialization PASSED"
     92     return True
     93 
     94 def test_domain_search_list_parsing():
     95     parsed_domains = dhcp_packet.DomainListOption.unpack(
     96             TEST_DOMAIN_SEARCH_LIST_COMPRESSED)
     97     # Order matters too.
     98     parsed_domains = tuple(parsed_domains)
     99     if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED:
    100         print ("Parsed binary domain list and got %s but expected %s" %
    101                (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED))
    102         return False
    103     print "test_domain_search_list_parsing PASSED"
    104     return True
    105 
    106 def test_domain_search_list_serialization():
    107     byte_string = dhcp_packet.DomainListOption.pack(
    108             TEST_DOMAIN_SEARCH_LIST_PARSED)
    109     if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED:
    110         # Turn the strings into printable hex strings on a single line.
    111         pretty_actual = bin2hex(byte_string, 100)
    112         pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100)
    113         print ("Expected to serialize %s to %s but instead got %s." %
    114                (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual))
    115         return False
    116     print "test_domain_search_list_serialization PASSED"
    117     return True
    118 
    119 def receive_packet(a_socket, timeout_seconds=1.0):
    120     data = None
    121     start_time = time.time()
    122     while data is None and start_time + timeout_seconds > time.time():
    123         try:
    124             data, _ = a_socket.recvfrom(1024)
    125         except socket.timeout:
    126             pass # We expect many timeouts.
    127     if data is None:
    128         print "Timed out before we received a response from the server."
    129         return None
    130 
    131     print "Client received a packet of length %d from the server." % len(data)
    132     packet = dhcp_packet.DhcpPacket(byte_str=data)
    133     if not packet.is_valid:
    134         print "Received an invalid response from DHCP server."
    135         return None
    136 
    137     return packet
    138 
    139 def test_simple_server_exchange(server):
    140     intended_ip = "127.0.0.42"
    141     subnet_mask = "255.255.255.0"
    142     server_ip = "127.0.0.1"
    143     lease_time_seconds = 60
    144     test_timeout = 3.0
    145     mac_addr = "\x01\x02\x03\x04\x05\x06"
    146     # Build up our packets and have them request some default option values,
    147     # like the IP we're being assigned and the address of the server assigning
    148     # it.
    149     discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
    150     discovery_message.set_option(
    151             dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
    152             dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    153     request_message = dhcp_packet.DhcpPacket.create_request_packet(
    154             discovery_message.transaction_id,
    155             mac_addr)
    156     request_message.set_option(
    157             dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
    158             dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    159     # This is the pool of settings the DHCP server will seem to draw from to
    160     # answer queries from the client.  This information is written into packets
    161     # through the handling rules.
    162     dhcp_server_config = {
    163             dhcp_packet.OPTION_SERVER_ID : server_ip,
    164             dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
    165             dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
    166             dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
    167             }
    168     # Build up the handling rules for the server and start the test.
    169     rules = []
    170     rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    171             intended_ip,
    172             server_ip,
    173             dhcp_server_config))
    174     rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
    175             intended_ip,
    176             server_ip,
    177             dhcp_server_config))
    178     rules[-1].is_final_handler = True
    179     server.start_test(rules, test_timeout)
    180     # Because we don't want to require root permissions to run these tests,
    181     # listen on the loopback device, don't broadcast, and don't use reserved
    182     # ports (like the actual DHCP ports).  Use 8068/8067 instead.
    183     client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    184     client_socket.bind(("127.0.0.1", 8068))
    185     client_socket.settimeout(0.1)
    186     client_socket.sendto(discovery_message.to_binary_string(),
    187                          (server_ip, 8067))
    188 
    189     offer_packet = receive_packet(client_socket)
    190     if offer_packet is None:
    191         return False
    192 
    193     if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER):
    194         print "Type of DHCP response is not offer."
    195         return False
    196 
    197     if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
    198         print "Server didn't offer the IP we expected."
    199         return False
    200 
    201     print "Offer looks good to the client, sending request."
    202     # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages.  In
    203     # our unit test, we have to do this ourselves.
    204     request_message.set_option(
    205             dhcp_packet.OPTION_SERVER_ID,
    206             offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
    207     request_message.set_option(
    208             dhcp_packet.OPTION_SUBNET_MASK,
    209             offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
    210     request_message.set_option(
    211             dhcp_packet.OPTION_IP_LEASE_TIME,
    212             offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
    213     request_message.set_option(
    214             dhcp_packet.OPTION_REQUESTED_IP,
    215             offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
    216     # Send the REQUEST message.
    217     client_socket.sendto(request_message.to_binary_string(),
    218                          (server_ip, 8067))
    219     ack_packet = receive_packet(client_socket)
    220     if ack_packet is None:
    221         return False
    222 
    223     if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK):
    224         print "Type of DHCP response is not acknowledgement."
    225         return False
    226 
    227     if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
    228         print "Server didn't give us the IP we expected."
    229         return False
    230 
    231     print "Waiting for the server to finish."
    232     server.wait_for_test_to_finish()
    233     print "Server agrees that the test is over."
    234     if not server.last_test_passed:
    235         print "Server is unhappy with the test result."
    236         return False
    237 
    238     print "test_simple_server_exchange PASSED."
    239     return True
    240 
    241 def test_server_dialogue():
    242     server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
    243                                              ingress_port=8067,
    244                                              broadcast_address="127.0.0.1",
    245                                              broadcast_port=8068)
    246     server.start()
    247     ret = False
    248     if server.is_healthy:
    249         ret = test_simple_server_exchange(server)
    250     else:
    251         print "Server isn't healthy, aborting."
    252     print "Sending server stop() signal."
    253     server.stop()
    254     print "Stop signal sent."
    255     return ret
    256 
    257 def run_tests():
    258     logger = logging.getLogger("dhcp")
    259     logger.setLevel(logging.DEBUG)
    260     stream_handler = logging.StreamHandler()
    261     stream_handler.setLevel(logging.DEBUG)
    262     logger.addHandler(stream_handler)
    263     retval = test_packet_serialization()
    264     retval &= test_classless_static_route_parsing()
    265     retval &= test_classless_static_route_serialization()
    266     retval &= test_domain_search_list_parsing()
    267     retval &= test_domain_search_list_serialization()
    268     retval &= test_server_dialogue()
    269     if retval:
    270         print "All tests PASSED."
    271         return 0
    272     else:
    273         print "Some tests FAILED"
    274         return -1
    275 
    276 if __name__ == "__main__":
    277     sys.exit(run_tests())
    278