Home | History | Annotate | Download | only in pylib
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
      6 
      7 It's used to accept requests from the device to spawn and kill instances of the
      8 chrome test server on the host.
      9 """
     10 # pylint: disable=W0702
     11 
     12 import BaseHTTPServer
     13 import json
     14 import logging
     15 import os
     16 import select
     17 import struct
     18 import subprocess
     19 import sys
     20 import threading
     21 import time
     22 import urlparse
     23 
     24 from pylib import constants
     25 from pylib import ports
     26 
     27 from pylib.forwarder import Forwarder
     28 
     29 
     30 # Path that are needed to import necessary modules when launching a testserver.
     31 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
     32     % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
     33        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
     34        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
     35                     'src'),
     36        os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
     37        os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
     38 
     39 
     40 SERVER_TYPES = {
     41     'http': '',
     42     'ftp': '-f',
     43     'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
     44     'tcpecho': '--tcp-echo',
     45     'udpecho': '--udp-echo',
     46 }
     47 
     48 
     49 # The timeout (in seconds) of starting up the Python test server.
     50 TEST_SERVER_STARTUP_TIMEOUT = 10
     51 
     52 def _WaitUntil(predicate, max_attempts=5):
     53   """Blocks until the provided predicate (function) is true.
     54 
     55   Returns:
     56     Whether the provided predicate was satisfied once (before the timeout).
     57   """
     58   sleep_time_sec = 0.025
     59   for _ in xrange(1, max_attempts):
     60     if predicate():
     61       return True
     62     time.sleep(sleep_time_sec)
     63     sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
     64   return False
     65 
     66 
     67 def _CheckPortStatus(port, expected_status):
     68   """Returns True if port has expected_status.
     69 
     70   Args:
     71     port: the port number.
     72     expected_status: boolean of expected status.
     73 
     74   Returns:
     75     Returns True if the status is expected. Otherwise returns False.
     76   """
     77   return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status)
     78 
     79 
     80 def _CheckDevicePortStatus(device, port):
     81   """Returns whether the provided port is used."""
     82   return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port))
     83 
     84 
     85 def _GetServerTypeCommandLine(server_type):
     86   """Returns the command-line by the given server type.
     87 
     88   Args:
     89     server_type: the server type to be used (e.g. 'http').
     90 
     91   Returns:
     92     A string containing the command-line argument.
     93   """
     94   if server_type not in SERVER_TYPES:
     95     raise NotImplementedError('Unknown server type: %s' % server_type)
     96   if server_type == 'udpecho':
     97     raise Exception('Please do not run UDP echo tests because we do not have '
     98                     'a UDP forwarder tool.')
     99   return SERVER_TYPES[server_type]
    100 
    101 
    102 class TestServerThread(threading.Thread):
    103   """A thread to run the test server in a separate process."""
    104 
    105   def __init__(self, ready_event, arguments, device, tool):
    106     """Initialize TestServerThread with the following argument.
    107 
    108     Args:
    109       ready_event: event which will be set when the test server is ready.
    110       arguments: dictionary of arguments to run the test server.
    111       device: An instance of DeviceUtils.
    112       tool: instance of runtime error detection tool.
    113     """
    114     threading.Thread.__init__(self)
    115     self.wait_event = threading.Event()
    116     self.stop_flag = False
    117     self.ready_event = ready_event
    118     self.ready_event.clear()
    119     self.arguments = arguments
    120     self.device = device
    121     self.tool = tool
    122     self.test_server_process = None
    123     self.is_ready = False
    124     self.host_port = self.arguments['port']
    125     assert isinstance(self.host_port, int)
    126     # The forwarder device port now is dynamically allocated.
    127     self.forwarder_device_port = 0
    128     # Anonymous pipe in order to get port info from test server.
    129     self.pipe_in = None
    130     self.pipe_out = None
    131     self.process = None
    132     self.command_line = []
    133 
    134   def _WaitToStartAndGetPortFromTestServer(self):
    135     """Waits for the Python test server to start and gets the port it is using.
    136 
    137     The port information is passed by the Python test server with a pipe given
    138     by self.pipe_out. It is written as a result to |self.host_port|.
    139 
    140     Returns:
    141       Whether the port used by the test server was successfully fetched.
    142     """
    143     assert self.host_port == 0 and self.pipe_out and self.pipe_in
    144     (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
    145                                    TEST_SERVER_STARTUP_TIMEOUT)
    146     if len(in_fds) == 0:
    147       logging.error('Failed to wait to the Python test server to be started.')
    148       return False
    149     # First read the data length as an unsigned 4-byte value.  This
    150     # is _not_ using network byte ordering since the Python test server packs
    151     # size as native byte order and all Chromium platforms so far are
    152     # configured to use little-endian.
    153     # TODO(jnd): Change the Python test server and local_test_server_*.cc to
    154     # use a unified byte order (either big-endian or little-endian).
    155     data_length = os.read(self.pipe_in, struct.calcsize('=L'))
    156     if data_length:
    157       (data_length,) = struct.unpack('=L', data_length)
    158       assert data_length
    159     if not data_length:
    160       logging.error('Failed to get length of server data.')
    161       return False
    162     port_json = os.read(self.pipe_in, data_length)
    163     if not port_json:
    164       logging.error('Failed to get server data.')
    165       return False
    166     logging.info('Got port json data: %s', port_json)
    167     port_json = json.loads(port_json)
    168     if port_json.has_key('port') and isinstance(port_json['port'], int):
    169       self.host_port = port_json['port']
    170       return _CheckPortStatus(self.host_port, True)
    171     logging.error('Failed to get port information from the server data.')
    172     return False
    173 
    174   def _GenerateCommandLineArguments(self):
    175     """Generates the command line to run the test server.
    176 
    177     Note that all options are processed by following the definitions in
    178     testserver.py.
    179     """
    180     if self.command_line:
    181       return
    182 
    183     args_copy = dict(self.arguments)
    184 
    185     # Translate the server type.
    186     type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
    187     if type_cmd:
    188       self.command_line.append(type_cmd)
    189 
    190     # Use a pipe to get the port given by the instance of Python test server
    191     # if the test does not specify the port.
    192     assert self.host_port == args_copy['port']
    193     if self.host_port == 0:
    194       (self.pipe_in, self.pipe_out) = os.pipe()
    195       self.command_line.append('--startup-pipe=%d' % self.pipe_out)
    196 
    197     # Pass the remaining arguments as-is.
    198     for key, values in args_copy.iteritems():
    199       if not isinstance(values, list):
    200         values = [values]
    201       for value in values:
    202         if value is None:
    203           self.command_line.append('--%s' % key)
    204         else:
    205           self.command_line.append('--%s=%s' % (key, value))
    206 
    207   def _CloseUnnecessaryFDsForTestServerProcess(self):
    208     # This is required to avoid subtle deadlocks that could be caused by the
    209     # test server child process inheriting undesirable file descriptors such as
    210     # file lock file descriptors.
    211     for fd in xrange(0, 1024):
    212       if fd != self.pipe_out:
    213         try:
    214           os.close(fd)
    215         except:
    216           pass
    217 
    218   def run(self):
    219     logging.info('Start running the thread!')
    220     self.wait_event.clear()
    221     self._GenerateCommandLineArguments()
    222     command = constants.DIR_SOURCE_ROOT
    223     if self.arguments['server-type'] == 'sync':
    224       command = [os.path.join(command, 'sync', 'tools', 'testserver',
    225                               'sync_testserver.py')] + self.command_line
    226     else:
    227       command = [os.path.join(command, 'net', 'tools', 'testserver',
    228                               'testserver.py')] + self.command_line
    229     logging.info('Running: %s', command)
    230     # Pass DIR_SOURCE_ROOT as the child's working directory so that relative
    231     # paths in the arguments are resolved correctly.
    232     self.process = subprocess.Popen(
    233         command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess,
    234         cwd=constants.DIR_SOURCE_ROOT)
    235     if self.process:
    236       if self.pipe_out:
    237         self.is_ready = self._WaitToStartAndGetPortFromTestServer()
    238       else:
    239         self.is_ready = _CheckPortStatus(self.host_port, True)
    240     if self.is_ready:
    241       Forwarder.Map([(0, self.host_port)], self.device, self.tool)
    242       # Check whether the forwarder is ready on the device.
    243       self.is_ready = False
    244       device_port = Forwarder.DevicePortForHostPort(self.host_port)
    245       if device_port and _CheckDevicePortStatus(self.device, device_port):
    246         self.is_ready = True
    247         self.forwarder_device_port = device_port
    248     # Wake up the request handler thread.
    249     self.ready_event.set()
    250     # Keep thread running until Stop() gets called.
    251     _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
    252     if self.process.poll() is None:
    253       self.process.kill()
    254     Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device)
    255     self.process = None
    256     self.is_ready = False
    257     if self.pipe_out:
    258       os.close(self.pipe_in)
    259       os.close(self.pipe_out)
    260       self.pipe_in = None
    261       self.pipe_out = None
    262     logging.info('Test-server has died.')
    263     self.wait_event.set()
    264 
    265   def Stop(self):
    266     """Blocks until the loop has finished.
    267 
    268     Note that this must be called in another thread.
    269     """
    270     if not self.process:
    271       return
    272     self.stop_flag = True
    273     self.wait_event.wait()
    274 
    275 
    276 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    277   """A handler used to process http GET/POST request."""
    278 
    279   def _SendResponse(self, response_code, response_reason, additional_headers,
    280                     contents):
    281     """Generates a response sent to the client from the provided parameters.
    282 
    283     Args:
    284       response_code: number of the response status.
    285       response_reason: string of reason description of the response.
    286       additional_headers: dict of additional headers. Each key is the name of
    287                           the header, each value is the content of the header.
    288       contents: string of the contents we want to send to client.
    289     """
    290     self.send_response(response_code, response_reason)
    291     self.send_header('Content-Type', 'text/html')
    292     # Specify the content-length as without it the http(s) response will not
    293     # be completed properly (and the browser keeps expecting data).
    294     self.send_header('Content-Length', len(contents))
    295     for header_name in additional_headers:
    296       self.send_header(header_name, additional_headers[header_name])
    297     self.end_headers()
    298     self.wfile.write(contents)
    299     self.wfile.flush()
    300 
    301   def _StartTestServer(self):
    302     """Starts the test server thread."""
    303     logging.info('Handling request to spawn a test server.')
    304     content_type = self.headers.getheader('content-type')
    305     if content_type != 'application/json':
    306       raise Exception('Bad content-type for start request.')
    307     content_length = self.headers.getheader('content-length')
    308     if not content_length:
    309       content_length = 0
    310     try:
    311       content_length = int(content_length)
    312     except:
    313       raise Exception('Bad content-length for start request.')
    314     logging.info(content_length)
    315     test_server_argument_json = self.rfile.read(content_length)
    316     logging.info(test_server_argument_json)
    317     assert not self.server.test_server_instance
    318     ready_event = threading.Event()
    319     self.server.test_server_instance = TestServerThread(
    320         ready_event,
    321         json.loads(test_server_argument_json),
    322         self.server.device,
    323         self.server.tool)
    324     self.server.test_server_instance.setDaemon(True)
    325     self.server.test_server_instance.start()
    326     ready_event.wait()
    327     if self.server.test_server_instance.is_ready:
    328       self._SendResponse(200, 'OK', {}, json.dumps(
    329           {'port': self.server.test_server_instance.forwarder_device_port,
    330            'message': 'started'}))
    331       logging.info('Test server is running on port: %d.',
    332                    self.server.test_server_instance.host_port)
    333     else:
    334       self.server.test_server_instance.Stop()
    335       self.server.test_server_instance = None
    336       self._SendResponse(500, 'Test Server Error.', {}, '')
    337       logging.info('Encounter problem during starting a test server.')
    338 
    339   def _KillTestServer(self):
    340     """Stops the test server instance."""
    341     # There should only ever be one test server at a time. This may do the
    342     # wrong thing if we try and start multiple test servers.
    343     if not self.server.test_server_instance:
    344       return
    345     port = self.server.test_server_instance.host_port
    346     logging.info('Handling request to kill a test server on port: %d.', port)
    347     self.server.test_server_instance.Stop()
    348     # Make sure the status of test server is correct before sending response.
    349     if _CheckPortStatus(port, False):
    350       self._SendResponse(200, 'OK', {}, 'killed')
    351       logging.info('Test server on port %d is killed', port)
    352     else:
    353       self._SendResponse(500, 'Test Server Error.', {}, '')
    354       logging.info('Encounter problem during killing a test server.')
    355     self.server.test_server_instance = None
    356 
    357   def do_POST(self):
    358     parsed_path = urlparse.urlparse(self.path)
    359     action = parsed_path.path
    360     logging.info('Action for POST method is: %s.', action)
    361     if action == '/start':
    362       self._StartTestServer()
    363     else:
    364       self._SendResponse(400, 'Unknown request.', {}, '')
    365       logging.info('Encounter unknown request: %s.', action)
    366 
    367   def do_GET(self):
    368     parsed_path = urlparse.urlparse(self.path)
    369     action = parsed_path.path
    370     params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    371     logging.info('Action for GET method is: %s.', action)
    372     for param in params:
    373       logging.info('%s=%s', param, params[param][0])
    374     if action == '/kill':
    375       self._KillTestServer()
    376     elif action == '/ping':
    377       # The ping handler is used to check whether the spawner server is ready
    378       # to serve the requests. We don't need to test the status of the test
    379       # server when handling ping request.
    380       self._SendResponse(200, 'OK', {}, 'ready')
    381       logging.info('Handled ping request and sent response.')
    382     else:
    383       self._SendResponse(400, 'Unknown request', {}, '')
    384       logging.info('Encounter unknown request: %s.', action)
    385 
    386 
    387 class SpawningServer(object):
    388   """The class used to start/stop a http server."""
    389 
    390   def __init__(self, test_server_spawner_port, device, tool):
    391     logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
    392     self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
    393                                             SpawningServerRequestHandler)
    394     self.server.device = device
    395     self.server.tool = tool
    396     self.server.test_server_instance = None
    397     self.server.build_type = constants.GetBuildType()
    398 
    399   def _Listen(self):
    400     logging.info('Starting test server spawner')
    401     self.server.serve_forever()
    402 
    403   def Start(self):
    404     """Starts the test server spawner."""
    405     listener_thread = threading.Thread(target=self._Listen)
    406     listener_thread.setDaemon(True)
    407     listener_thread.start()
    408 
    409   def Stop(self):
    410     """Stops the test server spawner.
    411 
    412     Also cleans the server state.
    413     """
    414     self.CleanupState()
    415     self.server.shutdown()
    416 
    417   def CleanupState(self):
    418     """Cleans up the spawning server state.
    419 
    420     This should be called if the test server spawner is reused,
    421     to avoid sharing the test server instance.
    422     """
    423     if self.server.test_server_instance:
    424       self.server.test_server_instance.Stop()
    425       self.server.test_server_instance = None
    426