Home | History | Annotate | Download | only in rpm_control_system
      1 #!/usr/bin/python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import errno
      7 import heapq
      8 import logging
      9 import os
     10 import sys
     11 import socket
     12 import threading
     13 import xmlrpclib
     14 
     15 import rpm_logging_config
     16 from config import rpm_config
     17 from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
     18 from rpm_infrastructure_exception import RPMInfrastructureException
     19 
     20 import common
     21 from autotest_lib.server import frontend
     22 from autotest_lib.site_utils.rpm_control_system import utils
     23 
     24 DEFAULT_RPM_COUNT = 0
     25 TERMINATED = -1
     26 
     27 # Indexes for accessing heap entries.
     28 RPM_COUNT = 0
     29 DISPATCHER_URI = 1
     30 
     31 LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
     32 DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
     33 
     34 # Valid state values.
     35 VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
     36 
     37 # Servo-interface mapping file
     38 MAPPING_FILE = os.path.join(
     39         os.path.dirname(__file__),
     40         rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
     41 
     42 # Size of the LRU that holds power management unit information related
     43 # to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
     44 LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
     45 
     46 
     47 class RPMFrontendServer(object):
     48     """
     49     This class is the frontend server of the RPM Infrastructure. All clients
     50     will send their power state requests to this central server who will
     51     forward the requests to an avaliable or already assigned RPM dispatcher
     52     server.
     53 
     54     Once the dispatcher processes the request it will return the result
     55     to this frontend server who will send the result back to the client.
     56 
     57     All calls to this server are blocking.
     58 
     59     @var _dispatcher_minheap: Min heap that returns a list of format-
     60                               [ num_rpm's, dispatcher_uri ]
     61                               Used to choose the least loaded dispatcher.
     62     @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
     63                      heap. If a dispatcher server shuts down this allows us to
     64                      invalidate the entry in the minheap.
     65     @var _lock: Used to protect data from multiple running threads all
     66                 manipulating the same data.
     67     @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
     68                     server.
     69     @var _mapping_last_modified: Last-modified time of the servo-interface
     70                                  mapping file.
     71     @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
     72     @var _rpm_info: An LRU cache to hold recently visited rpm information
     73                     so that we don't hit AFE too often. The elements in
     74                     the cache are instances of PowerUnitInfo indexed by
     75                     dut hostnames. POE info is not stored in the cache.
     76     @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
     77     @var _email_handler: Email handler to use to control email notifications.
     78     """
     79 
     80 
     81     def __init__(self, email_handler=None):
     82         """
     83         RPMFrontendServer constructor.
     84 
     85         Initializes instance variables.
     86         """
     87         self._dispatcher_minheap = []
     88         self._entry_dict = {}
     89         self._lock = threading.Lock()
     90         self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
     91         self._servo_interface = utils.load_servo_interface_mapping()
     92         self._rpm_dict = {}
     93         self._afe = frontend.AFE()
     94         self._rpm_info = utils.LRUCache(size=LRU_SIZE)
     95         self._email_handler = email_handler
     96 
     97 
     98     def queue_request(self, device_hostname, new_state):
     99         """
    100         Forwards a request to change a device's (a dut or a servo) power state
    101         to the appropriate dispatcher server.
    102 
    103         This call will block until the forwarded request returns.
    104 
    105         @param device_hostname: Hostname of the device whose power state we want to
    106                              change.
    107         @param new_state: [ON, OFF, CYCLE] State to which we want to set the
    108                           device's outlet to.
    109 
    110         @return: True if the attempt to change power state was successful,
    111                  False otherwise.
    112 
    113         @raise RPMInfrastructureException: No dispatchers are available or can
    114                                            be reached.
    115         """
    116         # Remove any DNS Zone information and simplify down to just the hostname.
    117         device_hostname = device_hostname.split('.')[0]
    118         new_state = new_state.upper()
    119         # Put new_state in all uppercase letters
    120         if new_state not in VALID_STATE_VALUES:
    121             logging.error('Received request to set device %s to invalid '
    122                           'state %s', device_hostname, new_state)
    123             return False
    124         logging.info('Received request to set device: %s to state: %s',
    125                      device_hostname, new_state)
    126         powerunit_info = self._get_powerunit_info(device_hostname)
    127         dispatcher_uri = self._get_dispatcher(powerunit_info)
    128         if not dispatcher_uri:
    129             # No dispatchers available.
    130             raise RPMInfrastructureException('No dispatchers available.')
    131         client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
    132         try:
    133             # Block on the request and return the result once it arrives.
    134             return client.queue_request(powerunit_info, new_state)
    135         except socket.error as er:
    136             # Dispatcher Server is not reachable. Unregister it and retry.
    137             logging.error("Can't reach Dispatch Server: %s. Error: %s",
    138                           dispatcher_uri, errno.errorcode[er.errno])
    139             if self.is_network_infrastructure_down():
    140                 # No dispatchers can handle this request so raise an Exception
    141                 # to the caller.
    142                 raise RPMInfrastructureException('No dispatchers can be'
    143                                                  'reached.')
    144             logging.info('Will attempt forwarding request to other dispatch '
    145                          'servers.')
    146             logging.error('Unregistering %s due to error. Recommend resetting '
    147                           'that dispatch server.', dispatcher_uri)
    148             self.unregister_dispatcher(dispatcher_uri)
    149             # Retry forwarding the request.
    150             return self.queue_request(device_hostname, new_state)
    151 
    152 
    153     def is_network_infrastructure_down(self):
    154         """
    155         Check to see if we can communicate with any dispatcher servers.
    156 
    157         Only called in the situation that queuing a request to a dispatcher
    158         server failed.
    159 
    160         @return: False if any dispatcher server is up and the rpm infrastructure
    161                  can still function. True otherwise.
    162         """
    163         for dispatcher_entry in self._dispatcher_minheap:
    164             dispatcher = xmlrpclib.ServerProxy(
    165                     dispatcher_entry[DISPATCHER_URI], allow_none=True)
    166             try:
    167                 if dispatcher.is_up():
    168                     # Atleast one dispatcher is alive so our network is fine.
    169                     return False
    170             except socket.error:
    171                 # Can't talk to this dispatcher so keep looping.
    172                 pass
    173         logging.error("Can't reach any dispatchers. Check frontend network "
    174                       'status or all dispatchers are down.')
    175         return True
    176 
    177 
    178     def _get_powerunit_info(self, device_hostname):
    179         """Get the power management unit information for a device.
    180 
    181         A device could be a chromeos dut or a servo.
    182         1) ChromeOS dut
    183         Chromeos dut is managed by RPM. The related information
    184         we need to know include rpm hostname, rpm outlet, hydra hostname.
    185         Such information can be retrieved from afe_host_attributes table
    186         from afe. A local LRU cache is used avoid hitting afe too often.
    187 
    188         2) Servo
    189         Servo is managed by POE. The related information we need to know
    190         include poe hostname, poe interface. Such information is
    191         stored in a local file and read into memory.
    192 
    193         @param device_hostname: A string representing the device's hostname.
    194 
    195         @returns: A PowerUnitInfo object.
    196         @raises RPMInfrastructureException if failed to get the power
    197                 unit info.
    198 
    199         """
    200         with self._lock:
    201             if device_hostname.endswith('servo'):
    202                 # Servos are managed by Cisco POE switches.
    203                 reload_info = utils.reload_servo_interface_mapping_if_necessary(
    204                         self._mapping_last_modified)
    205                 if reload_info:
    206                     self._mapping_last_modified, self._servo_interface = reload_info
    207                 switch_if_tuple = self._servo_interface.get(device_hostname)
    208                 if not switch_if_tuple:
    209                     raise RPMInfrastructureException(
    210                             'Could not determine POE hostname for %s. '
    211                             'Please check the servo-interface mapping file.',
    212                             device_hostname)
    213                 else:
    214                     return utils.PowerUnitInfo(
    215                             device_hostname=device_hostname,
    216                             powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
    217                             powerunit_hostname=switch_if_tuple[0],
    218                             outlet=switch_if_tuple[1],
    219                             hydra_hostname=None)
    220             else:
    221                 # Regular DUTs are managed by RPMs.
    222                 if device_hostname in self._rpm_info:
    223                     return self._rpm_info[device_hostname]
    224                 else:
    225                     hosts = self._afe.get_hosts(hostname=device_hostname)
    226                     if not hosts:
    227                         raise RPMInfrastructureException(
    228                                 'Can not retrieve rpm information '
    229                                 'from AFE for %s, no host found.' % device_hostname)
    230                     else:
    231                         info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
    232                         self._rpm_info[device_hostname] = info
    233                         return info
    234 
    235 
    236     def _get_dispatcher(self, powerunit_info):
    237         """
    238         Private method that looks up or assigns a dispatcher server
    239         responsible for communicating with the given RPM/POE.
    240 
    241         Will also call _check_dispatcher to make sure it is up before returning
    242         it.
    243 
    244         @param powerunit_info: A PowerUnitInfo instance.
    245 
    246         @return: URI of dispatcher server responsible for the rpm/poe.
    247                  None if no dispatcher servers are available.
    248         """
    249         powerunit_type = powerunit_info.powerunit_type
    250         powerunit_hostname = powerunit_info.powerunit_hostname
    251         with self._lock:
    252             if self._rpm_dict.get(powerunit_hostname):
    253                 return self._rpm_dict[powerunit_hostname]
    254             logging.info('No Dispatcher assigned for %s %s.',
    255                          powerunit_type, powerunit_hostname)
    256             # Choose the least loaded dispatcher to communicate with the RPM.
    257             try:
    258                 heap_entry = heapq.heappop(self._dispatcher_minheap)
    259             except IndexError:
    260                 logging.error('Infrastructure Error: Frontend has no'
    261                               'registered dispatchers to field out this '
    262                               'request!')
    263                 return None
    264             dispatcher_uri = heap_entry[DISPATCHER_URI]
    265             # Put this entry back in the heap with an RPM Count + 1.
    266             heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
    267             heapq.heappush(self._dispatcher_minheap, heap_entry)
    268             logging.info('Assigning %s for %s %s', dispatcher_uri,
    269                          powerunit_type, powerunit_hostname)
    270             self._rpm_dict[powerunit_hostname] = dispatcher_uri
    271             return dispatcher_uri
    272 
    273 
    274     def register_dispatcher(self, dispatcher_uri):
    275         """
    276         Called by a dispatcher server so that the frontend server knows it is
    277         available to field out RPM requests.
    278 
    279         Adds an entry to the min heap and entry map for this dispatcher.
    280 
    281         @param dispatcher_uri: Address of dispatcher server we are registering.
    282         """
    283         logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
    284         with self._lock:
    285             heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
    286             heapq.heappush(self._dispatcher_minheap, heap_entry)
    287             self._entry_dict[dispatcher_uri] = heap_entry
    288 
    289 
    290     def unregister_dispatcher(self, uri_to_unregister):
    291         """
    292         Called by a dispatcher server as it exits so that the frontend server
    293         knows that it is no longer available to field out requests.
    294 
    295         Assigns an rpm count of -1 to this dispatcher so that it will be pushed
    296         out of the min heap.
    297 
    298         Removes from _rpm_dict all entries with the value of this dispatcher so
    299         that those RPM's can be reassigned to a new dispatcher.
    300 
    301         @param uri_to_unregister: Address of dispatcher server we are
    302                                   unregistering.
    303         """
    304         logging.info('Unregistering uri: %s as a rpm dispatcher.',
    305                      uri_to_unregister)
    306         with self._lock:
    307             heap_entry = self._entry_dict.get(uri_to_unregister)
    308             if not heap_entry:
    309                 logging.warning('%s was not registered.', uri_to_unregister)
    310                 return
    311             # Set this entry's RPM_COUNT to TERMINATED (-1).
    312             heap_entry[RPM_COUNT] = TERMINATED
    313             # Remove all RPM mappings.
    314             for rpm, dispatcher in self._rpm_dict.items():
    315                 if dispatcher == uri_to_unregister:
    316                     self._rpm_dict[rpm] = None
    317             self._entry_dict[uri_to_unregister] = None
    318             # Re-sort the heap and remove any terminated dispatchers.
    319             heapq.heapify(self._dispatcher_minheap)
    320             self._remove_terminated_dispatchers()
    321 
    322 
    323     def _remove_terminated_dispatchers(self):
    324         """
    325         Peek at the head of the heap and keep popping off values until there is
    326         a non-terminated dispatcher at the top.
    327         """
    328         # Heapq guarantees the head of the heap is in the '0' index.
    329         try:
    330             # Peek at the next element in the heap.
    331             top_of_heap = self._dispatcher_minheap[0]
    332             while top_of_heap[RPM_COUNT] is TERMINATED:
    333                 # Pop off the top element.
    334                 heapq.heappop(self._dispatcher_minheap)
    335                 # Peek at the next element in the heap.
    336                 top_of_heap = self._dispatcher_minheap[0]
    337         except IndexError:
    338             # No more values in the heap. Can be thrown by both minheap[0]
    339             # statements.
    340             pass
    341 
    342 
    343     def suspend_emails(self, hours):
    344         """Suspend email notifications.
    345 
    346         @param hours: How many hours to suspend email notifications.
    347         """
    348         if self._email_handler:
    349             self._email_handler.suspend_emails(hours)
    350 
    351 
    352     def resume_emails(self):
    353         """Resume email notifications."""
    354         if self._email_handler:
    355             self._email_handler.resume_emails()
    356 
    357 
    358 if __name__ == '__main__':
    359     """
    360     Main function used to launch the frontend server. Creates an instance of
    361     RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
    362     """
    363     if len(sys.argv) != 2:
    364       print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
    365       sys.exit(1)
    366 
    367     email_handler = rpm_logging_config.set_up_logging_to_file(
    368             sys.argv[1], LOG_FILENAME_FORMAT)
    369     frontend_server = RPMFrontendServer(email_handler=email_handler)
    370     # We assume that external clients will always connect to us via the
    371     # hostname, so listening on the hostname ensures we pick the right network
    372     # interface.
    373     address = socket.gethostname()
    374     port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
    375     server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
    376     server.register_instance(frontend_server)
    377     logging.info('Listening on %s port %d', address, port)
    378     server.serve_forever()
    379