1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """ 6 Base class for DHCP tests. This class just sets up a little bit of plumbing, 7 like a virtual ethernet device with one end that looks like a real ethernet 8 device to shill and a DHCP test server on the end that doesn't look like a real 9 ethernet interface to shill. Child classes should override test_body() with the 10 logic of their test. The plumbing of DhcpTestBase is accessible via properties. 11 """ 12 13 import logging 14 import socket 15 import struct 16 import time 17 import traceback 18 19 from autotest_lib.client.bin import test 20 from autotest_lib.client.common_lib import error 21 from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 22 from autotest_lib.client.cros import dhcp_handling_rule 23 from autotest_lib.client.cros import dhcp_packet 24 from autotest_lib.client.cros import dhcp_test_server 25 from autotest_lib.client.cros.networking import shill_proxy 26 27 28 # These are keys that may be used with the DBus dictionary returned from 29 # DhcpTestBase.get_interface_ipconfig(). 30 DHCPCD_KEY_NAMESERVERS = 'NameServers' 31 DHCPCD_KEY_GATEWAY = 'Gateway' 32 DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast' 33 DHCPCD_KEY_ADDRESS = 'Address' 34 DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen' 35 DHCPCD_KEY_DOMAIN_NAME = 'DomainName' 36 DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname' 37 DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 38 39 # We should be able to complete a DHCP negotiation in this amount of time. 40 DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10 41 42 # After DHCP completes, an ipconfig should appear shortly after 43 IPCONFIG_POLL_COUNT = 5 44 IPCONFIG_POLL_PERIOD_SECONDS = 0.5 45 46 class DhcpTestBase(test.test): 47 """Parent class for tests that work verify DHCP behavior.""" 48 version = 1 49 50 @staticmethod 51 def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix): 52 """ 53 Create a new IPv4 address in a subnet by bitwise and'ing an existing 54 address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in 55 |ip_suffix|. For safety, bitwise or the suffix with the complement of 56 the subnet mask. 57 58 Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105") 59 60 The example usage will return "192.168.1.105". 61 62 @param subnet_mask string subnet mask, e.g. "255.255.255.0" 63 @param ip_in_subnet string an IP address in the desired subnet 64 @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105" 65 66 @return string IP address on in the same subnet with specified suffix. 67 68 """ 69 mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0] 70 subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0] 71 suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0] 72 return socket.inet_ntoa(struct.pack('!I', (subnet | suffix))) 73 74 75 def get_device(self, interface_name): 76 """Finds the corresponding Device object for an interface with 77 the name |interface_name|. 78 79 @param interface_name string The name of the interface to check. 80 81 @return DBus interface object representing the associated device. 82 83 """ 84 return self.shill_proxy.find_object('Device', 85 {'Name': interface_name}) 86 87 88 def find_ethernet_service(self, interface_name): 89 """Finds the corresponding service object for an Ethernet interface. 90 91 @param interface_name string The name of the associated interface 92 93 @return Service object representing the associated service. 94 95 """ 96 device = self.get_device(interface_name) 97 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 98 return self.shill_proxy.find_object('Service', {'Device': device_path}) 99 100 101 def get_interface_ipconfig_objects(self, interface_name): 102 """ 103 Returns a list of dbus object proxies for |interface_name|. 104 Returns an empty list if no such interface exists. 105 106 @param interface_name string name of the device to query (e.g., "eth0"). 107 108 @return list of objects representing DBus IPConfig RPC endpoints. 109 110 """ 111 device = self.get_device(interface_name) 112 if device is None: 113 return [] 114 115 device_properties = device.GetProperties(utf8_strings=True) 116 proxy = self.shill_proxy 117 118 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 119 return filter(bool, 120 [ proxy.get_dbus_object(ipconfig_object, property_path) 121 for property_path in device_properties['IPConfigs'] ]) 122 123 124 def get_interface_ipconfig(self, interface_name): 125 """ 126 Returns a dictionary containing settings for an |interface_name| set 127 via DHCP. Returns None if no such interface or setting bundle on 128 that interface can be found in shill. 129 130 @param interface_name string name of the device to query (e.g., "eth0"). 131 132 @return dict containing the the properties of the IPConfig stripped 133 of DBus meta-data or None. 134 135 """ 136 dhcp_properties = None 137 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 138 logging.info('Looking at ipconfig %r', ipconfig) 139 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 140 if 'Method' not in ipconfig_properties: 141 logging.info('Found ipconfig object with no method field') 142 continue 143 if ipconfig_properties['Method'] != 'dhcp': 144 logging.info('Found ipconfig object with method != dhcp') 145 continue 146 if dhcp_properties != None: 147 raise error.TestFail('Found multiple ipconfig objects ' 148 'with method == dhcp') 149 dhcp_properties = ipconfig_properties 150 if dhcp_properties is None: 151 logging.info('Did not find IPConfig object with method == dhcp') 152 return None 153 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 154 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 155 156 157 def run_once(self): 158 self._server = None 159 self._server_ip = None 160 self._ethernet_pair = None 161 self._server = None 162 self._shill_proxy = shill_proxy.ShillProxy() 163 try: 164 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 165 peer_interface_name='pseudoethernet0', 166 peer_interface_ip=None) 167 self._ethernet_pair.setup() 168 if not self._ethernet_pair.is_healthy: 169 raise error.TestFail('Could not create virtual ethernet pair.') 170 self._server_ip = self._ethernet_pair.interface_ip 171 self._server = dhcp_test_server.DhcpTestServer( 172 self._ethernet_pair.interface_name) 173 self._server.start() 174 if not self._server.is_healthy: 175 raise error.TestFail('Could not start DHCP test server.') 176 self._subnet_mask = self._ethernet_pair.interface_subnet_mask 177 self.test_body() 178 except (error.TestFail, error.TestNAError): 179 # Pass these through without modification. 180 raise 181 except Exception as e: 182 logging.error('Caught exception: %s.', str(e)) 183 logging.error('Trace: %s', traceback.format_exc()) 184 raise error.TestFail('Caught exception: %s.' % str(e)) 185 finally: 186 if self._server is not None: 187 self._server.stop() 188 if self._ethernet_pair is not None: 189 self._ethernet_pair.teardown() 190 191 def test_body(self): 192 """ 193 Override this method with the body of your test. You may safely assume 194 that the the properties exposed by DhcpTestBase correctly return 195 references to the test apparatus. 196 """ 197 raise error.TestFail('No test body implemented') 198 199 @property 200 def server_ip(self): 201 """ 202 Return the IP address of the side of the interface that the DHCP test 203 server is bound to. The server itself is bound the the broadcast 204 address on the interface. 205 """ 206 return self._server_ip 207 208 @property 209 def server(self): 210 """ 211 Returns a reference to the DHCP test server. Use this to add handlers 212 and run tests. 213 """ 214 return self._server 215 216 @property 217 def ethernet_pair(self): 218 """ 219 Returns a reference to the virtual ethernet pair created to run DHCP 220 tests on. 221 """ 222 return self._ethernet_pair 223 224 @property 225 def shill_proxy(self): 226 """ 227 Returns a the shill proxy instance. 228 """ 229 return self._shill_proxy 230 231 def negotiate_and_check_lease(self, 232 dhcp_options, 233 custom_fields={}, 234 disable_check=False): 235 """ 236 Perform DHCP lease negotiation, and ensure that the resulting 237 ipconfig matches the DHCP options provided to the server. 238 239 @param dhcp_options dict of properties the DHCP server should provide. 240 @param custom_fields dict of custom DHCP parameters to add to server. 241 @param disable_check bool whether to perform IPConfig parameter 242 checking. 243 244 """ 245 if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options: 246 raise error.TestFail('You must specify OPTION_REQUESTED_IP to ' 247 'negotiate a DHCP lease') 248 intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP] 249 # Build up the handling rules for the server and start the test. 250 rules = [] 251 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 252 intended_ip, 253 self.server_ip, 254 dhcp_options, 255 custom_fields)) 256 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 257 intended_ip, 258 self.server_ip, 259 dhcp_options, 260 custom_fields)) 261 rules[-1].is_final_handler = True 262 self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS) 263 logging.info('Server is negotiating new lease with options: %s', 264 dhcp_options) 265 self.server.wait_for_test_to_finish() 266 if not self.server.last_test_passed: 267 raise error.TestFail( 268 'Test failed: active rule is %s' % self.server.current_rule) 269 270 if disable_check: 271 logging.info('Skipping check of negotiated DHCP lease parameters.') 272 else: 273 self.wait_for_dhcp_propagation() 274 self.check_dhcp_config(dhcp_options) 275 276 def wait_for_dhcp_propagation(self): 277 """ 278 Wait for configuration to propagate over dbus to shill. 279 TODO(wiley) Make this event based. This is pretty sloppy. 280 """ 281 time.sleep(0.1) 282 283 def check_dhcp_config(self, dhcp_options): 284 """ 285 Compare the DHCP ipconfig with DHCP lease parameters to ensure 286 that the DUT attained the correct values. 287 288 @param dhcp_options dict of properties the DHCP server provided. 289 290 """ 291 # The config is what the interface was actually configured with, as 292 # opposed to dhcp_options, which is what the server expected it be 293 # configured with. 294 for attempt in range(IPCONFIG_POLL_COUNT): 295 dhcp_config = self.get_interface_ipconfig( 296 self.ethernet_pair.peer_interface_name) 297 if dhcp_config is not None: 298 break 299 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 300 else: 301 raise error.TestFail('Failed to retrieve DHCP ipconfig object ' 302 'from shill.') 303 304 logging.debug('Got DHCP config: %s', str(dhcp_config)) 305 expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP) 306 configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS) 307 if expected_address != configured_address: 308 raise error.TestFail('Interface configured with IP address not ' 309 'granted by the DHCP server after DHCP ' 310 'negotiation. Expected %s but got %s.' % 311 (expected_address, configured_address)) 312 313 # While DNS related settings only propagate to the system when the 314 # service is marked as the default service, we can still check the 315 # IP address on the interface, since that is set immediately. 316 interface_address = self.ethernet_pair.peer_interface_ip 317 if expected_address != interface_address: 318 raise error.TestFail('shill somehow knew about the proper DHCP ' 319 'assigned address: %s, but configured the ' 320 'interface with something completely ' 321 'different: %s.' % 322 (expected_address, interface_address)) 323 324 expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS) 325 configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS) 326 if (expected_dns_servers is not None and 327 expected_dns_servers != configured_dns_servers): 328 raise error.TestFail('Expected to be configured with DNS server ' 329 'list %s, but was configured with %s ' 330 'instead.' % (expected_dns_servers, 331 configured_dns_servers)) 332 333 expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME) 334 configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME) 335 if (expected_domain_name is not None and 336 expected_domain_name != configured_domain_name): 337 raise error.TestFail('Expected to be configured with domain ' 338 'name %s, but got %s instead.' % 339 (expected_domain_name, configured_domain_name)) 340 341 expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME) 342 configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME) 343 if (expected_host_name is not None and 344 expected_host_name != configured_host_name): 345 raise error.TestFail('Expected to be configured with host ' 346 'name %s, but got %s instead.' % 347 (expected_host_name, configured_host_name)) 348 349 expected_search_list = dhcp_options.get( 350 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST) 351 configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST) 352 if (expected_search_list is not None and 353 expected_search_list != configured_search_list): 354 raise error.TestFail('Expected to be configured with domain ' 355 'search list %s, but got %s instead.' % 356 (expected_search_list, configured_search_list)) 357 358 expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS) 359 if (not expected_routers and 360 dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)): 361 classless_static_routes = dhcp_options[ 362 dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES] 363 for prefix, destination, gateway in classless_static_routes: 364 if not prefix: 365 logging.info('Using %s as the default gateway', gateway) 366 expected_routers = [ gateway ] 367 break 368 configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY) 369 if expected_routers and expected_routers[0] != configured_router: 370 raise error.TestFail('Expected to be configured with gateway %s, ' 371 'but got %s instead.' % 372 (expected_routers[0], configured_router)) 373 374 self.server.wait_for_test_to_finish() 375 if not self.server.last_test_passed: 376 raise error.TestFail('Test server didn\'t get all the messages it ' 377 'was told to expect for renewal.') 378