Home | History | Annotate | Download | only in rpm_control_system
      1 #!/usr/bin/python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import atexit
      7 import errno
      8 import logging
      9 import re
     10 import sys
     11 import socket
     12 import threading
     13 import xmlrpclib
     14 
     15 import rpm_controller
     16 import rpm_logging_config
     17 
     18 from config import rpm_config
     19 from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
     20 from rpm_infrastructure_exception import RPMInfrastructureException
     21 
     22 import common
     23 from autotest_lib.site_utils.rpm_control_system import utils
     24 
     25 LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
     26 
     27 
     28 class RPMDispatcher(object):
     29     """
     30     This class is the RPM dispatcher server and it is responsible for
     31     communicating directly to the RPM devices to change a DUT's outlet status.
     32 
     33     When an RPMDispatcher is initialized it registers itself with the frontend
     34     server, who will field out outlet requests to this dispatcher.
     35 
     36     Once a request is received the dispatcher looks up the RPMController
     37     instance for the given DUT and then queues up the request and blocks until
     38     it is processed.
     39 
     40     @var _address: IP address or Hostname of this dispatcher server.
     41     @var _frontend_server: URI of the frontend server.
     42     @var _lock: Lock used to synchronize access to _worker_dict.
     43     @var _port: Port assigned to this server instance.
     44     @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
     45                        instances.
     46     """
     47 
     48 
     49     def __init__(self, address, port):
     50         """
     51         RPMDispatcher constructor.
     52 
     53         Initialized instance vars and registers this server with the frontend
     54         server.
     55 
     56         @param address: Address of this dispatcher server.
     57         @param port: Port assigned to this dispatcher server.
     58 
     59         @raise RPMInfrastructureException: Raised if the dispatch server is
     60                                            unable to register with the frontend
     61                                            server.
     62         """
     63         self._address = address
     64         self._port = port
     65         self._lock = threading.Lock()
     66         self._worker_dict = {}
     67         self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
     68                                                'frontend_uri')
     69         logging.info('Registering this rpm dispatcher with the frontend '
     70                      'server at %s.', self._frontend_server)
     71         client = xmlrpclib.ServerProxy(self._frontend_server)
     72         # De-register with the frontend when the dispatcher exit's.
     73         atexit.register(self._unregister)
     74         try:
     75             client.register_dispatcher(self._get_serveruri())
     76         except socket.error as er:
     77             err_msg = ('Unable to register with frontend server. Error: %s.' %
     78                        errno.errorcode[er.errno])
     79             logging.error(err_msg)
     80             raise RPMInfrastructureException(err_msg)
     81 
     82 
     83     def _worker_dict_put(self, key, value):
     84         """
     85         Private method used to synchronize access to _worker_dict.
     86 
     87         @param key: key value we are using to access _worker_dict.
     88         @param value: value we are putting into _worker_dict.
     89         """
     90         with self._lock:
     91             self._worker_dict[key] = value
     92 
     93 
     94     def _worker_dict_get(self, key):
     95         """
     96         Private method used to synchronize access to _worker_dict.
     97 
     98         @param key: key value we are using to access _worker_dict.
     99         @return: value found when accessing _worker_dict
    100         """
    101         with self._lock:
    102             return self._worker_dict.get(key)
    103 
    104 
    105     def is_up(self):
    106         """
    107         Allows the frontend server to see if the dispatcher server is up before
    108         attempting to queue requests.
    109 
    110         @return: True. If connection fails, the client proxy will throw a socket
    111                  error on the client side.
    112         """
    113         return True
    114 
    115 
    116     def queue_request(self, powerunit_info_dict, new_state):
    117         """
    118         Looks up the appropriate RPMController instance for the device and queues
    119         up the request.
    120 
    121         @param powerunit_info_dict: A dictionary, containing the attribute/values
    122                                     of an unmarshalled PowerUnitInfo instance.
    123         @param new_state: [ON, OFF, CYCLE] state we want to the change the
    124                           outlet to.
    125         @return: True if the attempt to change power state was successful,
    126                  False otherwise.
    127         """
    128         powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict)
    129         logging.info('Received request to set device: %s to state: %s',
    130                      powerunit_info.device_hostname, new_state)
    131         rpm_controller = self._get_rpm_controller(
    132                 powerunit_info.powerunit_hostname,
    133                 powerunit_info.hydra_hostname)
    134         return rpm_controller.queue_request(powerunit_info, new_state)
    135 
    136 
    137     def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None):
    138         """
    139         Private method that retreives the appropriate RPMController instance
    140         for this RPM Hostname or calls _create_rpm_controller it if it does not
    141         already exist.
    142 
    143         @param rpm_hostname: hostname of the RPM whose RPMController we want.
    144 
    145         @return: RPMController instance responsible for this RPM.
    146         """
    147         if not rpm_hostname:
    148             return None
    149         rpm_controller = self._worker_dict_get(rpm_hostname)
    150         if not rpm_controller:
    151             rpm_controller = self._create_rpm_controller(
    152                     rpm_hostname, hydra_hostname)
    153             self._worker_dict_put(rpm_hostname, rpm_controller)
    154         return rpm_controller
    155 
    156 
    157     def _create_rpm_controller(self, rpm_hostname, hydra_hostname):
    158         """
    159         Determines the type of RPMController required and initializes it.
    160 
    161         @param rpm_hostname: Hostname of the RPM we need to communicate with.
    162 
    163         @return: RPMController instance responsible for this RPM.
    164         """
    165         hostname_elements = rpm_hostname.split('-')
    166         if hostname_elements[-2] == 'poe':
    167             # POE switch hostname looks like 'chromeos2-poe-switch1'.
    168             logging.info('The controller is a Cisco POE switch.')
    169             return rpm_controller.CiscoPOEController(rpm_hostname)
    170         else:
    171             # The device is an RPM.
    172             rack_id = hostname_elements[-2]
    173             rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
    174             if rpm_typechecker.match(rack_id):
    175                 logging.info('RPM is a webpowered device.')
    176                 return rpm_controller.WebPoweredRPMController(rpm_hostname)
    177             else:
    178                 logging.info('RPM is a Sentry CDU device.')
    179                 return rpm_controller.SentryRPMController(
    180                         hostname=rpm_hostname,
    181                         hydra_hostname=hydra_hostname)
    182 
    183 
    184     def _get_serveruri(self):
    185         """
    186         Formats the _address and _port into a meaningful URI string.
    187 
    188         @return: URI of this dispatch server.
    189         """
    190         return 'http://%s:%d' % (self._address, self._port)
    191 
    192 
    193     def _unregister(self):
    194         """
    195         Tells the frontend server that this dispatch server is shutting down and
    196         to unregister it.
    197 
    198         Called by atexit.
    199 
    200         @raise RPMInfrastructureException: Raised if the dispatch server is
    201                                            unable to unregister with the
    202                                            frontend server.
    203         """
    204         logging.info('Dispatch server shutting down. Unregistering with RPM '
    205                      'frontend server.')
    206         client = xmlrpclib.ServerProxy(self._frontend_server)
    207         try:
    208             client.unregister_dispatcher(self._get_serveruri())
    209         except socket.error as er:
    210             err_msg = ('Unable to unregister with frontend server. Error: %s.' %
    211                        errno.errorcode[er.errno])
    212             logging.error(err_msg)
    213             raise RPMInfrastructureException(err_msg)
    214 
    215 
    216 def launch_server_on_unused_port():
    217     """
    218     Looks up an unused port on this host and launches the xmlrpc server.
    219 
    220     Useful for testing by running multiple dispatch servers on the same host.
    221 
    222     @return: server,port - server object and the port that which it is listening
    223              to.
    224     """
    225     address = socket.gethostbyname(socket.gethostname())
    226     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    227     # Set this socket to allow reuse.
    228     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    229     sock.bind(('', 0))
    230     port = sock.getsockname()[1]
    231     server = MultiThreadedXMLRPCServer((address, port),
    232                                        allow_none=True)
    233     sock.close()
    234     return server, port
    235 
    236 
    237 if __name__ == '__main__':
    238     """
    239     Main function used to launch the dispatch server. Creates an instance of
    240     RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
    241     """
    242     if len(sys.argv) > 1:
    243       print 'Usage: ./%s, no arguments available.' % sys.argv[0]
    244       sys.exit(1)
    245     rpm_logging_config.start_log_server(LOG_FILENAME_FORMAT)
    246     rpm_logging_config.set_up_logging(use_log_server=True)
    247 
    248     # Get the local ip _address and set the server to utilize it.
    249     address = socket.gethostbyname(socket.gethostname())
    250     server, port = launch_server_on_unused_port()
    251     rpm_dispatcher = RPMDispatcher(address, port)
    252     server.register_instance(rpm_dispatcher)
    253     server.serve_forever()
    254