1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 from autotest_lib.client.common_lib import error 6 from autotest_lib.client.cros import dhcp_handling_rule 7 from autotest_lib.client.cros import dhcp_packet 8 from autotest_lib.client.cros import dhcp_test_base 9 10 # Length of time the lease from the DHCP server is valid. 11 LEASE_TIME_SECONDS = 3600 12 # We'll fill in the subnet and give this address to the client. 13 INTENDED_IP_SUFFIX = "0.0.0.101" 14 15 class network_DhcpNak(dhcp_test_base.DhcpTestBase): 16 """ 17 Tests a DHCP client's handling of NAK messages. 18 19 Negotiates a lease, and then tests how the DHCP client processes 20 NAKs in two scenarios. In the first scenario, the DHCP client 21 is started anew, but with the cached lease on disk. In the 22 second scenario, an already-running DHCP client is asked to 23 renew its lease. 24 25 In both scenarios, the NAK messages omit the DHCP server-id 26 option. This is to emulate some DHCP servers (e.g. OpenBSD 4.6), 27 which omit the DHCP server-id in NAK messages. 28 """ 29 30 def common_setup(self): 31 """ 32 Run common setup steps. 33 """ 34 subnet_mask = self.ethernet_pair.interface_subnet_mask 35 self.intended_ip = dhcp_test_base.DhcpTestBase.rewrite_ip_suffix( 36 subnet_mask, self.server_ip, INTENDED_IP_SUFFIX) 37 self.interface_name = self.ethernet_pair.peer_interface_name 38 self.dhcp_options = { 39 dhcp_packet.OPTION_SERVER_ID : self.server_ip, 40 dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 41 dhcp_packet.OPTION_IP_LEASE_TIME : LEASE_TIME_SECONDS, 42 dhcp_packet.OPTION_REQUESTED_IP : self.intended_ip, 43 dhcp_packet.OPTION_DNS_SERVERS : [], 44 dhcp_packet.OPTION_DOMAIN_NAME : '', 45 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST : [], 46 } 47 self.negotiate_and_check_lease(self.dhcp_options) 48 49 def reconnect_service(self): 50 """ 51 Disconnect and reconnect Ethernet. 52 53 Ask shill to disconnect and reconnect the Service for our 54 virtual Ethernet link. This causes shill to shut down and 55 restart dhcpcd for the link. 56 """ 57 service = self.find_ethernet_service(self.interface_name) 58 service.Disconnect() 59 rules = [ 60 # Respond to DISCOVERY, but then NAK the REQUEST. 61 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 62 self.intended_ip, self.server_ip, self.dhcp_options, {}), 63 dhcp_handling_rule.DhcpHandlingRule_RejectRequest(), 64 65 # Allow a successful negotiation the second time around. 66 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 67 self.intended_ip, self.server_ip, self.dhcp_options, {}), 68 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 69 self.intended_ip, self.server_ip, self.dhcp_options, {}), 70 ] 71 rules[-1].is_final_handler = True 72 self.server.start_test( 73 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 74 service.Connect() 75 76 def force_dhcp_renew(self): 77 """ 78 Force a DHCP renewal. 79 80 Ask shill to Refresh the configuration for the IPConfig object 81 associated with our virtual Ethernet link. This causes shill 82 to ask dhcpcd to renew its DHCP lease. 83 """ 84 rules = [ 85 # Reject REQUEST from renewal attempt. 86 dhcp_handling_rule.DhcpHandlingRule_RejectRequest(), 87 88 # Allow a successful negotiation after that. 89 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 90 self.intended_ip, self.server_ip, self.dhcp_options, {}), 91 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 92 self.intended_ip, self.server_ip, self.dhcp_options, {}), 93 ] 94 rules[-1].is_final_handler = True 95 self.server.start_test( 96 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 97 self.get_interface_ipconfig_objects(self.interface_name)[0].Refresh() 98 99 def send_ack_then_nak(self): 100 """ 101 Send an ACK followed by a NAK on re-connect to Ethernet. 102 103 Ask shill to disconnect and reconnect the Service for our 104 virtual Ethernet link. This causes shill to shut down and 105 restart dhcpcd for the link. Then perform a test where 106 the server responds to a REQUEST with an ACK followed by 107 an ACK. 108 """ 109 service = self.find_ethernet_service(self.interface_name) 110 service.Disconnect() 111 rules = [ 112 # Respond to DISCOVERY, but then both ACK and NAK the REQUEST. 113 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 114 self.intended_ip, self.server_ip, self.dhcp_options, {}), 115 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 116 self.intended_ip, self.server_ip, self.dhcp_options, {}, 117 False), 118 ] 119 rules[-1].is_final_handler = True 120 self.server.start_test( 121 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 122 service.Connect() 123 124 def send_nak_then_ack_with_conflict(self): 125 """ 126 Send an NAK followed by an ACK on re-connect to with address conflict. 127 128 Ask shill to disconnect and reconnect the Service for our 129 virtual Ethernet link. This causes shill to shut down and 130 restart dhcpcd for the link. 131 132 On reconnect, perform a test where the server responds to a 133 REQUEST with a NAK followed by an ACK, however with a lease 134 for an invalid address (the same IP address as the DHCP server). 135 136 Ensure that the client rejects the invalid lease with a DECLINE, 137 and that it also ignores the first OFFER for the same invalid 138 address. 139 """ 140 service = self.find_ethernet_service(self.interface_name) 141 service.Disconnect() 142 rules = [ 143 # Respond to DISCOVERY, but then both NAK then ACK the REQUEST, 144 # supplying the server's own IP address. 145 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 146 self.server_ip, self.server_ip, self.dhcp_options, {}), 147 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 148 self.server_ip, self.server_ip, self.dhcp_options, {}, 149 True), 150 151 # The client should eventually reject this lease since this 152 # address is in use. 153 dhcp_handling_rule.DhcpHandlingRule_AcceptDecline( 154 self.server_ip, self.dhcp_options, {}), 155 156 # Offer up the same (invalid) IP address. 157 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 158 self.server_ip, self.server_ip, self.dhcp_options, {}), 159 160 # The client should ignore the previous offer and perform 161 # another DISCOVER request. 162 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 163 self.intended_ip, self.server_ip, self.dhcp_options, {}), 164 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 165 self.intended_ip, self.server_ip, self.dhcp_options, {}), 166 ] 167 rules[-1].is_final_handler = True 168 self.server.start_test( 169 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 170 service.Connect() 171 172 def send_nak_then_ack_then_verify(self): 173 """ 174 Send an NAK followed by an ACK then verify client IP address. 175 176 Ask shill to disconnect and reconnect the Service for our 177 virtual Ethernet link. This causes shill to shut down and 178 restart dhcpcd for the link. 179 180 On reconnect, perform a test where the server responds to a 181 REQUEST with a NAK followed by an ACK. This method asserts 182 that the client does not DECLINE this address. 183 """ 184 service = self.find_ethernet_service(self.interface_name) 185 service.Disconnect() 186 187 # This rule serves two purposes: First it asserts that the client 188 # does not send a DECLINE response. Second, it waits until the 189 # test timeout, by which time client will have completed an "ARP 190 # self" operation to validate the offered IP adddres. 191 decline_rule = dhcp_handling_rule.DhcpHandlingRule_AcceptDecline( 192 self.intended_ip, self.dhcp_options, {}) 193 194 rules = [ 195 # Respond to DISCOVERY, but then both NAK then ACK the REQUEST, 196 # supplying the server's own IP address. 197 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 198 self.intended_ip, self.server_ip, self.dhcp_options, {}), 199 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 200 self.intended_ip, self.server_ip, self.dhcp_options, {}, 201 True), 202 decline_rule 203 ] 204 rules[-1].is_final_handler = True 205 self.server.start_test( 206 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 207 service.Connect() 208 self.server.wait_for_test_to_finish() 209 210 # This is a negative test, since we expect the last rule to fail. 211 if self.server.last_test_passed: 212 raise error.TestFail('DHCP DECLINE message was received') 213 elif self.server.current_rule != decline_rule: 214 raise error.TestFail('Failed on %s rule' % self.server.current_rule) 215 216 dhcp_config = self.get_interface_ipconfig( 217 self.ethernet_pair.peer_interface_name) 218 if dhcp_config is None: 219 raise error.TestFail('Did not get a DHCP config') 220 if dhcp_config[dhcp_test_base.DHCPCD_KEY_ADDRESS] != self.intended_ip: 221 raise error.TestFail('Client did not attain expected address %s' % 222 self.intended_ip) 223 224 def test_body(self): 225 """ 226 Entry point for this test. 227 228 This is called from DhcpTestBase.run_once(). 229 """ 230 self.common_setup() 231 for sub_test in (self.reconnect_service, 232 self.force_dhcp_renew, 233 self.send_ack_then_nak, 234 self.send_nak_then_ack_with_conflict): 235 sub_test() 236 self.server.wait_for_test_to_finish() 237 if not self.server.last_test_passed: 238 raise error.TestFail('Test failed (%s): active rule is %s' % ( 239 sub_test.__name__, self.server.current_rule)) 240 241 # This method is outside the loop above since it performs its own 242 # special verification. 243 self.send_nak_then_ack_then_verify() 244