Home | History | Annotate | Download | only in network_DhcpNak
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 from autotest_lib.client.common_lib import error
      6 from autotest_lib.client.cros import dhcp_handling_rule
      7 from autotest_lib.client.cros import dhcp_packet
      8 from autotest_lib.client.cros import dhcp_test_base
      9 
     10 # Length of time the lease from the DHCP server is valid.
     11 LEASE_TIME_SECONDS = 3600
     12 # We'll fill in the subnet and give this address to the client.
     13 INTENDED_IP_SUFFIX = "0.0.0.101"
     14 
     15 class network_DhcpNak(dhcp_test_base.DhcpTestBase):
     16     """
     17     Tests a DHCP client's handling of NAK messages.
     18 
     19     Negotiates a lease, and then tests how the DHCP client processes
     20     NAKs in two scenarios. In the first scenario, the DHCP client
     21     is started anew, but with the cached lease on disk. In the
     22     second scenario, an already-running DHCP client is asked to
     23     renew its lease.
     24 
     25     In both scenarios, the NAK messages omit the DHCP server-id
     26     option. This is to emulate some DHCP servers (e.g. OpenBSD 4.6),
     27     which omit the DHCP server-id in NAK messages.
     28     """
     29 
     30     def common_setup(self):
     31         """
     32         Run common setup steps.
     33         """
     34         subnet_mask = self.ethernet_pair.interface_subnet_mask
     35         self.intended_ip = dhcp_test_base.DhcpTestBase.rewrite_ip_suffix(
     36             subnet_mask, self.server_ip, INTENDED_IP_SUFFIX)
     37         self.interface_name = self.ethernet_pair.peer_interface_name
     38         self.dhcp_options = {
     39             dhcp_packet.OPTION_SERVER_ID : self.server_ip,
     40             dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
     41             dhcp_packet.OPTION_IP_LEASE_TIME : LEASE_TIME_SECONDS,
     42             dhcp_packet.OPTION_REQUESTED_IP : self.intended_ip,
     43             dhcp_packet.OPTION_DNS_SERVERS : [],
     44             dhcp_packet.OPTION_DOMAIN_NAME : '',
     45             dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST : [],
     46             }
     47         self.negotiate_and_check_lease(self.dhcp_options)
     48 
     49     def reconnect_service(self):
     50         """
     51         Disconnect and reconnect Ethernet.
     52 
     53         Ask shill to disconnect and reconnect the Service for our
     54         virtual Ethernet link. This causes shill to shut down and
     55         restart dhcpcd for the link.
     56         """
     57         service = self.find_ethernet_service(self.interface_name)
     58         service.Disconnect()
     59         rules = [
     60             # Respond to DISCOVERY, but then NAK the REQUEST.
     61             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
     62                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
     63             dhcp_handling_rule.DhcpHandlingRule_RejectRequest(),
     64 
     65             # Allow a successful negotiation the second time around.
     66             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
     67                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
     68             dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
     69                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
     70             ]
     71         rules[-1].is_final_handler = True
     72         self.server.start_test(
     73             rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
     74         service.Connect()
     75 
     76     def force_dhcp_renew(self):
     77         """
     78         Force a DHCP renewal.
     79 
     80         Ask shill to Refresh the configuration for the IPConfig object
     81         associated with our virtual Ethernet link. This causes shill
     82         to ask dhcpcd to renew its DHCP lease.
     83         """
     84         rules = [
     85             # Reject REQUEST from renewal attempt.
     86             dhcp_handling_rule.DhcpHandlingRule_RejectRequest(),
     87 
     88             # Allow a successful negotiation after that.
     89             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
     90                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
     91             dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
     92                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
     93             ]
     94         rules[-1].is_final_handler = True
     95         self.server.start_test(
     96             rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
     97         self.get_interface_ipconfig_objects(self.interface_name)[0].Refresh()
     98 
     99     def send_ack_then_nak(self):
    100         """
    101         Send an ACK followed by a NAK on re-connect to Ethernet.
    102 
    103         Ask shill to disconnect and reconnect the Service for our
    104         virtual Ethernet link. This causes shill to shut down and
    105         restart dhcpcd for the link.  Then perform a test where
    106         the server responds to a REQUEST with an ACK followed by
    107         an ACK.
    108         """
    109         service = self.find_ethernet_service(self.interface_name)
    110         service.Disconnect()
    111         rules = [
    112             # Respond to DISCOVERY, but then both ACK and NAK the REQUEST.
    113             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    114                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
    115             dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest(
    116                 self.intended_ip, self.server_ip, self.dhcp_options, {},
    117                 False),
    118             ]
    119         rules[-1].is_final_handler = True
    120         self.server.start_test(
    121             rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
    122         service.Connect()
    123 
    124     def send_nak_then_ack_with_conflict(self):
    125         """
    126         Send an NAK followed by an ACK on re-connect to with address conflict.
    127 
    128         Ask shill to disconnect and reconnect the Service for our
    129         virtual Ethernet link. This causes shill to shut down and
    130         restart dhcpcd for the link.
    131 
    132         On reconnect, perform a test where the server responds to a
    133         REQUEST with a NAK followed by an ACK, however with a lease
    134         for an invalid address (the same IP address as the DHCP server).
    135 
    136         Ensure that the client rejects the invalid lease with a DECLINE,
    137         and that it also ignores the first OFFER for the same invalid
    138         address.
    139         """
    140         service = self.find_ethernet_service(self.interface_name)
    141         service.Disconnect()
    142         rules = [
    143             # Respond to DISCOVERY, but then both NAK then ACK the REQUEST,
    144             # supplying the server's own IP address.
    145             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    146                 self.server_ip, self.server_ip, self.dhcp_options, {}),
    147             dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest(
    148                 self.server_ip, self.server_ip, self.dhcp_options, {},
    149                 True),
    150 
    151             # The client should eventually reject this lease since this
    152             # address is in use.
    153             dhcp_handling_rule.DhcpHandlingRule_AcceptDecline(
    154                 self.server_ip, self.dhcp_options, {}),
    155 
    156             # Offer up the same (invalid) IP address.
    157             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    158                 self.server_ip, self.server_ip, self.dhcp_options, {}),
    159 
    160             # The client should ignore the previous offer and perform
    161             # another DISCOVER request.
    162             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    163                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
    164             dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
    165                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
    166             ]
    167         rules[-1].is_final_handler = True
    168         self.server.start_test(
    169             rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
    170         service.Connect()
    171 
    172     def send_nak_then_ack_then_verify(self):
    173         """
    174         Send an NAK followed by an ACK then verify client IP address.
    175 
    176         Ask shill to disconnect and reconnect the Service for our
    177         virtual Ethernet link. This causes shill to shut down and
    178         restart dhcpcd for the link.
    179 
    180         On reconnect, perform a test where the server responds to a
    181         REQUEST with a NAK followed by an ACK.  This method asserts
    182         that the client does not DECLINE this address.
    183         """
    184         service = self.find_ethernet_service(self.interface_name)
    185         service.Disconnect()
    186 
    187         # This rule serves two purposes: First it asserts that the client
    188         # does not send a DECLINE response.  Second, it waits until the
    189         # test timeout, by which time client will have completed an "ARP
    190         # self" operation to validate the offered IP adddres.
    191         decline_rule = dhcp_handling_rule.DhcpHandlingRule_AcceptDecline(
    192             self.intended_ip, self.dhcp_options, {})
    193 
    194         rules = [
    195             # Respond to DISCOVERY, but then both NAK then ACK the REQUEST,
    196             # supplying the server's own IP address.
    197             dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    198                 self.intended_ip, self.server_ip, self.dhcp_options, {}),
    199             dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest(
    200                 self.intended_ip, self.server_ip, self.dhcp_options, {},
    201                 True),
    202             decline_rule
    203             ]
    204         rules[-1].is_final_handler = True
    205         self.server.start_test(
    206             rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
    207         service.Connect()
    208         self.server.wait_for_test_to_finish()
    209 
    210         # This is a negative test, since we expect the last rule to fail.
    211         if self.server.last_test_passed:
    212             raise error.TestFail('DHCP DECLINE message was received')
    213         elif self.server.current_rule != decline_rule:
    214             raise error.TestFail('Failed on %s rule' % self.server.current_rule)
    215 
    216         dhcp_config = self.get_interface_ipconfig(
    217                 self.ethernet_pair.peer_interface_name)
    218         if dhcp_config is None:
    219             raise error.TestFail('Did not get a DHCP config')
    220         if dhcp_config[dhcp_test_base.DHCPCD_KEY_ADDRESS] != self.intended_ip:
    221             raise error.TestFail('Client did not attain expected address %s' %
    222                                  self.intended_ip)
    223 
    224     def test_body(self):
    225         """
    226         Entry point for this test.
    227 
    228         This is called from DhcpTestBase.run_once().
    229         """
    230         self.common_setup()
    231         for sub_test in (self.reconnect_service,
    232                          self.force_dhcp_renew,
    233                          self.send_ack_then_nak,
    234                          self.send_nak_then_ack_with_conflict):
    235             sub_test()
    236             self.server.wait_for_test_to_finish()
    237             if not self.server.last_test_passed:
    238                 raise error.TestFail('Test failed (%s): active rule is %s' % (
    239                         sub_test.__name__, self.server.current_rule))
    240 
    241         # This method is outside the loop above since it performs its own
    242         # special verification.
    243         self.send_nak_then_ack_then_verify()
    244