Home | History | Annotate | Download | only in rpm_control_system
      1 #!/usr/bin/python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import atexit
      7 import errno
      8 import logging
      9 import re
     10 import sys
     11 import socket
     12 import threading
     13 import xmlrpclib
     14 
     15 import rpm_controller
     16 import rpm_logging_config
     17 
     18 from config import rpm_config
     19 from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
     20 from rpm_infrastructure_exception import RPMInfrastructureException
     21 
     22 import common
     23 from autotest_lib.site_utils.rpm_control_system import utils
     24 
     25 LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
     26 
     27 
     28 class RPMDispatcher(object):
     29     """
     30     This class is the RPM dispatcher server and it is responsible for
     31     communicating directly to the RPM devices to change a DUT's outlet status.
     32 
     33     When an RPMDispatcher is initialized it registers itself with the frontend
     34     server, who will field out outlet requests to this dispatcher.
     35 
     36     Once a request is received the dispatcher looks up the RPMController
     37     instance for the given DUT and then queues up the request and blocks until
     38     it is processed.
     39 
     40     @var _address: IP address or Hostname of this dispatcher server.
     41     @var _frontend_server: URI of the frontend server.
     42     @var _lock: Lock used to synchronize access to _worker_dict.
     43     @var _port: Port assigned to this server instance.
     44     @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
     45                        instances.
     46     """
     47 
     48 
     49     def __init__(self, address, port):
     50         """
     51         RPMDispatcher constructor.
     52 
     53         Initialized instance vars and registers this server with the frontend
     54         server.
     55 
     56         @param address: Address of this dispatcher server.
     57         @param port: Port assigned to this dispatcher server.
     58 
     59         @raise RPMInfrastructureException: Raised if the dispatch server is
     60                                            unable to register with the frontend
     61                                            server.
     62         """
     63         self._address = address
     64         self._port = port
     65         self._lock = threading.Lock()
     66         self._worker_dict = {}
     67         # We assume that the frontend server and dispatchers are running on the
     68         # same host, and the frontend server is listening for connections from
     69         # the external world.
     70         frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE',
     71                                                  'frontend_port')
     72         self._frontend_server = 'http://%s:%d' % (socket.gethostname(),
     73                                                   frontend_server_port)
     74         logging.info('Registering this rpm dispatcher with the frontend '
     75                      'server at %s.', self._frontend_server)
     76         client = xmlrpclib.ServerProxy(self._frontend_server)
     77         # De-register with the frontend when the dispatcher exit's.
     78         atexit.register(self._unregister)
     79         try:
     80             client.register_dispatcher(self._get_serveruri())
     81         except socket.error as er:
     82             err_msg = ('Unable to register with frontend server. Error: %s.' %
     83                        errno.errorcode[er.errno])
     84             logging.error(err_msg)
     85             raise RPMInfrastructureException(err_msg)
     86 
     87 
     88     def _worker_dict_put(self, key, value):
     89         """
     90         Private method used to synchronize access to _worker_dict.
     91 
     92         @param key: key value we are using to access _worker_dict.
     93         @param value: value we are putting into _worker_dict.
     94         """
     95         with self._lock:
     96             self._worker_dict[key] = value
     97 
     98 
     99     def _worker_dict_get(self, key):
    100         """
    101         Private method used to synchronize access to _worker_dict.
    102 
    103         @param key: key value we are using to access _worker_dict.
    104         @return: value found when accessing _worker_dict
    105         """
    106         with self._lock:
    107             return self._worker_dict.get(key)
    108 
    109 
    110     def is_up(self):
    111         """
    112         Allows the frontend server to see if the dispatcher server is up before
    113         attempting to queue requests.
    114 
    115         @return: True. If connection fails, the client proxy will throw a socket
    116                  error on the client side.
    117         """
    118         return True
    119 
    120 
    121     def queue_request(self, powerunit_info_dict, new_state):
    122         """
    123         Looks up the appropriate RPMController instance for the device and queues
    124         up the request.
    125 
    126         @param powerunit_info_dict: A dictionary, containing the attribute/values
    127                                     of an unmarshalled PowerUnitInfo instance.
    128         @param new_state: [ON, OFF, CYCLE] state we want to the change the
    129                           outlet to.
    130         @return: True if the attempt to change power state was successful,
    131                  False otherwise.
    132         """
    133         powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict)
    134         logging.info('Received request to set device: %s to state: %s',
    135                      powerunit_info.device_hostname, new_state)
    136         rpm_controller = self._get_rpm_controller(
    137                 powerunit_info.powerunit_hostname,
    138                 powerunit_info.hydra_hostname)
    139         return rpm_controller.queue_request(powerunit_info, new_state)
    140 
    141 
    142     def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None):
    143         """
    144         Private method that retreives the appropriate RPMController instance
    145         for this RPM Hostname or calls _create_rpm_controller it if it does not
    146         already exist.
    147 
    148         @param rpm_hostname: hostname of the RPM whose RPMController we want.
    149 
    150         @return: RPMController instance responsible for this RPM.
    151         """
    152         if not rpm_hostname:
    153             return None
    154         rpm_controller = self._worker_dict_get(rpm_hostname)
    155         if not rpm_controller:
    156             rpm_controller = self._create_rpm_controller(
    157                     rpm_hostname, hydra_hostname)
    158             self._worker_dict_put(rpm_hostname, rpm_controller)
    159         return rpm_controller
    160 
    161 
    162     def _create_rpm_controller(self, rpm_hostname, hydra_hostname):
    163         """
    164         Determines the type of RPMController required and initializes it.
    165 
    166         @param rpm_hostname: Hostname of the RPM we need to communicate with.
    167 
    168         @return: RPMController instance responsible for this RPM.
    169         """
    170         hostname_elements = rpm_hostname.split('-')
    171         if hostname_elements[-2] == 'poe':
    172             # POE switch hostname looks like 'chromeos2-poe-switch1'.
    173             logging.info('The controller is a Cisco POE switch.')
    174             return rpm_controller.CiscoPOEController(rpm_hostname)
    175         else:
    176             # The device is an RPM.
    177             rack_id = hostname_elements[-2]
    178             rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
    179             if rpm_typechecker.match(rack_id):
    180                 logging.info('RPM is a webpowered device.')
    181                 return rpm_controller.WebPoweredRPMController(rpm_hostname)
    182             else:
    183                 logging.info('RPM is a Sentry CDU device.')
    184                 return rpm_controller.SentryRPMController(
    185                         hostname=rpm_hostname,
    186                         hydra_hostname=hydra_hostname)
    187 
    188 
    189     def _get_serveruri(self):
    190         """
    191         Formats the _address and _port into a meaningful URI string.
    192 
    193         @return: URI of this dispatch server.
    194         """
    195         return 'http://%s:%d' % (self._address, self._port)
    196 
    197 
    198     def _unregister(self):
    199         """
    200         Tells the frontend server that this dispatch server is shutting down and
    201         to unregister it.
    202 
    203         Called by atexit.
    204 
    205         @raise RPMInfrastructureException: Raised if the dispatch server is
    206                                            unable to unregister with the
    207                                            frontend server.
    208         """
    209         logging.info('Dispatch server shutting down. Unregistering with RPM '
    210                      'frontend server.')
    211         client = xmlrpclib.ServerProxy(self._frontend_server)
    212         try:
    213             client.unregister_dispatcher(self._get_serveruri())
    214         except socket.error as er:
    215             err_msg = ('Unable to unregister with frontend server. Error: %s.' %
    216                        errno.errorcode[er.errno])
    217             logging.error(err_msg)
    218             raise RPMInfrastructureException(err_msg)
    219 
    220 
    221 def launch_server_on_unused_port():
    222     """
    223     Looks up an unused port on this host and launches the xmlrpc server.
    224 
    225     Useful for testing by running multiple dispatch servers on the same host.
    226 
    227     @return: server,port - server object and the port that which it is listening
    228              to.
    229     """
    230     address = socket.gethostbyname(socket.gethostname())
    231     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    232     # Set this socket to allow reuse.
    233     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    234     sock.bind(('', 0))
    235     port = sock.getsockname()[1]
    236     server = MultiThreadedXMLRPCServer((address, port),
    237                                        allow_none=True)
    238     sock.close()
    239     return server, port
    240 
    241 
    242 if __name__ == '__main__':
    243     """
    244     Main function used to launch the dispatch server. Creates an instance of
    245     RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
    246     """
    247     if len(sys.argv) != 2:
    248       print 'Usage: ./%s <log_file_name>' % sys.argv[0]
    249       sys.exit(1)
    250 
    251     rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT)
    252     rpm_logging_config.set_up_logging_to_server()
    253 
    254     # Get the local ip _address and set the server to utilize it.
    255     address = socket.gethostbyname(socket.gethostname())
    256     server, port = launch_server_on_unused_port()
    257     rpm_dispatcher = RPMDispatcher(address, port)
    258     server.register_instance(rpm_dispatcher)
    259     server.serve_forever()
    260