Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """
      6 Base class for DHCP tests.  This class just sets up a little bit of plumbing,
      7 like a virtual ethernet device with one end that looks like a real ethernet
      8 device to shill and a DHCP test server on the end that doesn't look like a real
      9 ethernet interface to shill.  Child classes should override test_body() with the
     10 logic of their test.  The plumbing of DhcpTestBase is accessible via properties.
     11 """
     12 
     13 import logging
     14 import socket
     15 import struct
     16 import time
     17 import traceback
     18 
     19 from autotest_lib.client.bin import test
     20 from autotest_lib.client.common_lib import error
     21 from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
     22 from autotest_lib.client.cros import dhcp_handling_rule
     23 from autotest_lib.client.cros import dhcp_packet
     24 from autotest_lib.client.cros import dhcp_test_server
     25 from autotest_lib.client.cros.networking import shill_proxy
     26 
     27 
     28 # These are keys that may be used with the DBus dictionary returned from
     29 # DhcpTestBase.get_interface_ipconfig().
     30 DHCPCD_KEY_NAMESERVERS = 'NameServers'
     31 DHCPCD_KEY_GATEWAY = 'Gateway'
     32 DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast'
     33 DHCPCD_KEY_ADDRESS = 'Address'
     34 DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen'
     35 DHCPCD_KEY_DOMAIN_NAME = 'DomainName'
     36 DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname'
     37 DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
     38 
     39 # We should be able to complete a DHCP negotiation in this amount of time.
     40 DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
     41 
     42 # After DHCP completes, an ipconfig should appear shortly after
     43 IPCONFIG_POLL_COUNT = 5
     44 IPCONFIG_POLL_PERIOD_SECONDS = 0.5
     45 
     46 class DhcpTestBase(test.test):
     47     """Parent class for tests that work verify DHCP behavior."""
     48     version = 1
     49 
     50     @staticmethod
     51     def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix):
     52         """
     53         Create a new IPv4 address in a subnet by bitwise and'ing an existing
     54         address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in
     55         |ip_suffix|.  For safety, bitwise or the suffix with the complement of
     56         the subnet mask.
     57 
     58         Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105")
     59 
     60         The example usage will return "192.168.1.105".
     61 
     62         @param subnet_mask string subnet mask, e.g. "255.255.255.0"
     63         @param ip_in_subnet string an IP address in the desired subnet
     64         @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105"
     65 
     66         @return string IP address on in the same subnet with specified suffix.
     67 
     68         """
     69         mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0]
     70         subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0]
     71         suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0]
     72         return socket.inet_ntoa(struct.pack('!I', (subnet | suffix)))
     73 
     74 
     75     def get_device(self, interface_name):
     76         """Finds the corresponding Device object for an interface with
     77         the name |interface_name|.
     78 
     79         @param interface_name string The name of the interface to check.
     80 
     81         @return DBus interface object representing the associated device.
     82 
     83         """
     84         return self.shill_proxy.find_object('Device',
     85                                             {'Name': interface_name})
     86 
     87 
     88     def find_ethernet_service(self, interface_name):
     89         """Finds the corresponding service object for an Ethernet interface.
     90 
     91         @param interface_name string The name of the associated interface
     92 
     93         @return Service object representing the associated service.
     94 
     95         """
     96         device = self.get_device(interface_name)
     97         device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
     98         return self.shill_proxy.find_object('Service', {'Device': device_path})
     99 
    100 
    101     def get_interface_ipconfig_objects(self, interface_name):
    102         """
    103         Returns a list of dbus object proxies for |interface_name|.
    104         Returns an empty list if no such interface exists.
    105 
    106         @param interface_name string name of the device to query (e.g., "eth0").
    107 
    108         @return list of objects representing DBus IPConfig RPC endpoints.
    109 
    110         """
    111         device = self.get_device(interface_name)
    112         if device is None:
    113             return []
    114 
    115         device_properties = device.GetProperties(utf8_strings=True)
    116         proxy = self.shill_proxy
    117 
    118         ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
    119         return filter(bool,
    120                       [ proxy.get_dbus_object(ipconfig_object, property_path)
    121                         for property_path in device_properties['IPConfigs'] ])
    122 
    123 
    124     def get_interface_ipconfig(self, interface_name):
    125         """
    126         Returns a dictionary containing settings for an |interface_name| set
    127         via DHCP.  Returns None if no such interface or setting bundle on
    128         that interface can be found in shill.
    129 
    130         @param interface_name string name of the device to query (e.g., "eth0").
    131 
    132         @return dict containing the the properties of the IPConfig stripped
    133             of DBus meta-data or None.
    134 
    135         """
    136         dhcp_properties = None
    137         for ipconfig in self.get_interface_ipconfig_objects(interface_name):
    138           logging.info('Looking at ipconfig %r', ipconfig)
    139           ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
    140           if 'Method' not in ipconfig_properties:
    141               logging.info('Found ipconfig object with no method field')
    142               continue
    143           if ipconfig_properties['Method'] != 'dhcp':
    144               logging.info('Found ipconfig object with method != dhcp')
    145               continue
    146           if dhcp_properties != None:
    147               raise error.TestFail('Found multiple ipconfig objects '
    148                                    'with method == dhcp')
    149           dhcp_properties = ipconfig_properties
    150         if dhcp_properties is None:
    151             logging.info('Did not find IPConfig object with method == dhcp')
    152             return None
    153         logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
    154         return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
    155 
    156 
    157     def run_once(self):
    158         self._server = None
    159         self._server_ip = None
    160         self._ethernet_pair = None
    161         self._server = None
    162         self._shill_proxy = shill_proxy.ShillProxy()
    163         try:
    164             self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
    165                     peer_interface_name='pseudoethernet0',
    166                     peer_interface_ip=None)
    167             self._ethernet_pair.setup()
    168             if not self._ethernet_pair.is_healthy:
    169                 raise error.TestFail('Could not create virtual ethernet pair.')
    170             self._server_ip = self._ethernet_pair.interface_ip
    171             self._server = dhcp_test_server.DhcpTestServer(
    172                     self._ethernet_pair.interface_name)
    173             self._server.start()
    174             if not self._server.is_healthy:
    175                 raise error.TestFail('Could not start DHCP test server.')
    176             self._subnet_mask = self._ethernet_pair.interface_subnet_mask
    177             self.test_body()
    178         except (error.TestFail, error.TestNAError):
    179             # Pass these through without modification.
    180             raise
    181         except Exception as e:
    182             logging.error('Caught exception: %s.', str(e))
    183             logging.error('Trace: %s', traceback.format_exc())
    184             raise error.TestFail('Caught exception: %s.' % str(e))
    185         finally:
    186             if self._server is not None:
    187                 self._server.stop()
    188             if self._ethernet_pair is not None:
    189                 self._ethernet_pair.teardown()
    190 
    191     def test_body(self):
    192         """
    193         Override this method with the body of your test.  You may safely assume
    194         that the the properties exposed by DhcpTestBase correctly return
    195         references to the test apparatus.
    196         """
    197         raise error.TestFail('No test body implemented')
    198 
    199     @property
    200     def server_ip(self):
    201         """
    202         Return the IP address of the side of the interface that the DHCP test
    203         server is bound to.  The server itself is bound the the broadcast
    204         address on the interface.
    205         """
    206         return self._server_ip
    207 
    208     @property
    209     def server(self):
    210         """
    211         Returns a reference to the DHCP test server.  Use this to add handlers
    212         and run tests.
    213         """
    214         return self._server
    215 
    216     @property
    217     def ethernet_pair(self):
    218         """
    219         Returns a reference to the virtual ethernet pair created to run DHCP
    220         tests on.
    221         """
    222         return self._ethernet_pair
    223 
    224     @property
    225     def shill_proxy(self):
    226         """
    227         Returns a the shill proxy instance.
    228         """
    229         return self._shill_proxy
    230 
    231     def negotiate_and_check_lease(self,
    232                                   dhcp_options,
    233                                   custom_fields={},
    234                                   disable_check=False):
    235         """
    236         Perform DHCP lease negotiation, and ensure that the resulting
    237         ipconfig matches the DHCP options provided to the server.
    238 
    239         @param dhcp_options dict of properties the DHCP server should provide.
    240         @param custom_fields dict of custom DHCP parameters to add to server.
    241         @param disable_check bool whether to perform IPConfig parameter
    242              checking.
    243 
    244         """
    245         if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options:
    246             raise error.TestFail('You must specify OPTION_REQUESTED_IP to '
    247                                  'negotiate a DHCP lease')
    248         intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP]
    249         # Build up the handling rules for the server and start the test.
    250         rules = []
    251         rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
    252                 intended_ip,
    253                 self.server_ip,
    254                 dhcp_options,
    255                 custom_fields))
    256         rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
    257                 intended_ip,
    258                 self.server_ip,
    259                 dhcp_options,
    260                 custom_fields))
    261         rules[-1].is_final_handler = True
    262         self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS)
    263         logging.info('Server is negotiating new lease with options: %s',
    264                      dhcp_options)
    265         self.server.wait_for_test_to_finish()
    266         if not self.server.last_test_passed:
    267             raise error.TestFail(
    268                 'Test failed: active rule is %s' % self.server.current_rule)
    269 
    270         if disable_check:
    271             logging.info('Skipping check of negotiated DHCP lease parameters.')
    272         else:
    273             self.wait_for_dhcp_propagation()
    274             self.check_dhcp_config(dhcp_options)
    275 
    276     def wait_for_dhcp_propagation(self):
    277         """
    278         Wait for configuration to propagate over dbus to shill.
    279         TODO(wiley) Make this event based.  This is pretty sloppy.
    280         """
    281         time.sleep(0.1)
    282 
    283     def check_dhcp_config(self, dhcp_options):
    284         """
    285         Compare the DHCP ipconfig with DHCP lease parameters to ensure
    286         that the DUT attained the correct values.
    287 
    288         @param dhcp_options dict of properties the DHCP server provided.
    289 
    290         """
    291         # The config is what the interface was actually configured with, as
    292         # opposed to dhcp_options, which is what the server expected it be
    293         # configured with.
    294         for attempt in range(IPCONFIG_POLL_COUNT):
    295             dhcp_config = self.get_interface_ipconfig(
    296                     self.ethernet_pair.peer_interface_name)
    297             if dhcp_config is not None:
    298                 break
    299             time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
    300         else:
    301             raise error.TestFail('Failed to retrieve DHCP ipconfig object '
    302                                  'from shill.')
    303 
    304         logging.debug('Got DHCP config: %s', str(dhcp_config))
    305         expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP)
    306         configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS)
    307         if expected_address != configured_address:
    308             raise error.TestFail('Interface configured with IP address not '
    309                                  'granted by the DHCP server after DHCP '
    310                                  'negotiation.  Expected %s but got %s.' %
    311                                  (expected_address, configured_address))
    312 
    313         # While DNS related settings only propagate to the system when the
    314         # service is marked as the default service, we can still check the
    315         # IP address on the interface, since that is set immediately.
    316         interface_address = self.ethernet_pair.peer_interface_ip
    317         if expected_address != interface_address:
    318             raise error.TestFail('shill somehow knew about the proper DHCP '
    319                                  'assigned address: %s, but configured the '
    320                                  'interface with something completely '
    321                                  'different: %s.' %
    322                                  (expected_address, interface_address))
    323 
    324         expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS)
    325         configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS)
    326         if (expected_dns_servers is not None and
    327             expected_dns_servers != configured_dns_servers):
    328             raise error.TestFail('Expected to be configured with DNS server '
    329                                  'list %s, but was configured with %s '
    330                                  'instead.' % (expected_dns_servers,
    331                                                configured_dns_servers))
    332 
    333         expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME)
    334         configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME)
    335         if (expected_domain_name is not None and
    336             expected_domain_name != configured_domain_name):
    337             raise error.TestFail('Expected to be configured with domain '
    338                                  'name %s, but got %s instead.' %
    339                                  (expected_domain_name, configured_domain_name))
    340 
    341         expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME)
    342         configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME)
    343         if (expected_host_name is not None and
    344             expected_host_name != configured_host_name):
    345             raise error.TestFail('Expected to be configured with host '
    346                                  'name %s, but got %s instead.' %
    347                                  (expected_host_name, configured_host_name))
    348 
    349         expected_search_list = dhcp_options.get(
    350                 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST)
    351         configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST)
    352         if (expected_search_list is not None and
    353             expected_search_list != configured_search_list):
    354             raise error.TestFail('Expected to be configured with domain '
    355                                  'search list %s, but got %s instead.' %
    356                                  (expected_search_list, configured_search_list))
    357 
    358         expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS)
    359         if (not expected_routers and
    360             dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)):
    361             classless_static_routes = dhcp_options[
    362                 dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES]
    363             for prefix, destination, gateway in classless_static_routes:
    364                 if not prefix:
    365                     logging.info('Using %s as the default gateway', gateway)
    366                     expected_routers = [ gateway ]
    367                     break
    368         configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY)
    369         if expected_routers and expected_routers[0] != configured_router:
    370             raise error.TestFail('Expected to be configured with gateway %s, '
    371                                  'but got %s instead.' %
    372                                  (expected_routers[0], configured_router))
    373 
    374         self.server.wait_for_test_to_finish()
    375         if not self.server.last_test_passed:
    376             raise error.TestFail('Test server didn\'t get all the messages it '
    377                                  'was told to expect for renewal.')
    378