Home | History | Annotate | Download | only in network_StressServoEthernetPlug
      1 import logging, re, time
      2 
      3 from autotest_lib.server import autotest, test, hosts
      4 from autotest_lib.server.cros import stress
      5 from autotest_lib.client.common_lib import error
      6 
      7 # Timeout duration to wait for a DHCP response.
      8 # It usually doesn't take this long, but just in case.
      9 TIMEOUT = 60
     10 
     11 class network_StressServoEthernetPlug(test.test):
     12 
     13     ETH_MAC = 'mac'
     14     ETH_IP = 'ipaddress'
     15 
     16     version = 1
     17 
     18 
     19     def initialize(self, host):
     20         self.host = host
     21         self.host_iface = None
     22         self.servo_iface = None
     23         self.servo_eth_up()
     24 
     25         end_time = time.time() + TIMEOUT
     26         while time.time() < end_time:
     27             self.eth_interfaces = self.get_eth_interfaces()
     28             if len(self.eth_interfaces) >= 2:
     29                 break
     30             time.sleep(1)
     31 
     32         # Assuming 2 ethernet interfaces, the interface not for host
     33         # is that associated with servo.
     34         for iface, eth_dict in self.eth_interfaces.iteritems():
     35             if eth_dict[self.ETH_IP] == self.host.hostname:
     36                 self.host_iface = iface
     37             else:
     38                 self.servo_iface = iface
     39 
     40         if not self.servo_iface:
     41             raise error.TestError('Cannot find servo ethernet interface')
     42 
     43         logging.info('Servo eth: %s', self.servo_iface)
     44         logging.info('Host eth: %s', self.host_iface)
     45 
     46 
     47     def servo_eth_up(self):
     48         logging.info('Bringing up ethernet')
     49         self.host.servo.set('dut_hub_on', 'yes')
     50 
     51 
     52     def servo_eth_down(self):
     53         logging.info('Bringing down ethernet')
     54         self.host.servo.set('dut_hub_on', 'no')
     55 
     56 
     57     def get_eth_interfaces(self):
     58         """ Gets the ethernet device object.
     59 
     60         Returns:
     61             A dictionary of ethernet devices.
     62             {
     63                 'eth<x>':
     64                 {
     65                     'mac': <mac address>,
     66                     'ipaddress': <ipaddress>,
     67                 }
     68             }
     69         """
     70         results = self.host.run('ifconfig').stdout.split('\n')
     71         eth_dict = {}
     72 
     73         iterator = results.__iter__()
     74         for line in iterator:
     75             # Search for the beginning of an interface section.
     76             iface_start = re.search('^(eth\S+)\s+Link encap:Ethernet\s+HWaddr'
     77                                     '\s+(\S+)', line)
     78             if iface_start:
     79                 (iface, hwaddr) = iface_start.groups()
     80                 line = iterator.next()
     81                 result = re.search('^\s+inet addr:(\S+)\s+', line)
     82                 ipaddress = None
     83                 if result:
     84                     ipaddress = result.groups()[0]
     85                 eth_dict[iface] = {self.ETH_MAC: hwaddr, self.ETH_IP: ipaddress}
     86         return eth_dict
     87 
     88 
     89     def verify_eth_status(self, up_list, timeout=TIMEOUT):
     90         """ Verify the up_list ifaces are up (and its contrapositive). """
     91         end_time = time.time() + timeout
     92         interfaces = {}
     93         while time.time() < end_time:
     94             interfaces = self.get_eth_interfaces()
     95             error_message = ('Expected eth status %s but instead got %s' %
     96                              (up_list, interfaces.keys()))
     97             if set(interfaces.keys()) == set(up_list):
     98                 # Check to make sure all the interfaces are up.
     99                 for iface, eth_dict in interfaces.iteritems():
    100                     if not eth_dict[self.ETH_IP]:
    101                         error_message = ('Ethernet interface %s did not '
    102                                          'receive address.' % iface)
    103                         break
    104                 else:
    105                     # All desired interfaces are up, and they have ip addresses.
    106                     break
    107             time.sleep(1)
    108         else:
    109             # If the while loop terminates without interruption, we've timed out
    110             # waiting for the interface.
    111             raise error.TestFail(error_message)
    112 
    113 
    114     def run_once(self, num_iterations=1):
    115         for iteration in range(num_iterations):
    116             logging.info('Executing iteration %d', iteration)
    117             self.servo_eth_down()
    118             self.verify_eth_status([self.host_iface])
    119             self.servo_eth_up()
    120             self.verify_eth_status([self.host_iface, self.servo_iface])
    121