Home | History | Annotate | Download | only in pylib
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
      6 
      7 It's used to accept requests from the device to spawn and kill instances of the
      8 chrome test server on the host.
      9 """
     10 # pylint: disable=W0702
     11 
     12 import BaseHTTPServer
     13 import json
     14 import logging
     15 import os
     16 import select
     17 import struct
     18 import subprocess
     19 import sys
     20 import threading
     21 import time
     22 import urlparse
     23 
     24 from devil.android import forwarder
     25 from devil.android import ports
     26 
     27 from pylib import constants
     28 from pylib.constants import host_paths
     29 
     30 
     31 # Path that are needed to import necessary modules when launching a testserver.
     32 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
     33     % (os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party'),
     34        os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
     35        os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
     36                     'src'),
     37        os.path.join(host_paths.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
     38        os.path.join(host_paths.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
     39 
     40 
     41 SERVER_TYPES = {
     42     'http': '',
     43     'ftp': '-f',
     44     'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
     45     'tcpecho': '--tcp-echo',
     46     'udpecho': '--udp-echo',
     47 }
     48 
     49 
     50 # The timeout (in seconds) of starting up the Python test server.
     51 TEST_SERVER_STARTUP_TIMEOUT = 10
     52 
     53 def _WaitUntil(predicate, max_attempts=5):
     54   """Blocks until the provided predicate (function) is true.
     55 
     56   Returns:
     57     Whether the provided predicate was satisfied once (before the timeout).
     58   """
     59   sleep_time_sec = 0.025
     60   for _ in xrange(1, max_attempts):
     61     if predicate():
     62       return True
     63     time.sleep(sleep_time_sec)
     64     sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
     65   return False
     66 
     67 
     68 def _CheckPortAvailable(port):
     69   """Returns True if |port| is available."""
     70   return _WaitUntil(lambda: ports.IsHostPortAvailable(port))
     71 
     72 
     73 def _CheckPortNotAvailable(port):
     74   """Returns True if |port| is not available."""
     75   return _WaitUntil(lambda: not ports.IsHostPortAvailable(port))
     76 
     77 
     78 def _CheckDevicePortStatus(device, port):
     79   """Returns whether the provided port is used."""
     80   return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port))
     81 
     82 
     83 def _GetServerTypeCommandLine(server_type):
     84   """Returns the command-line by the given server type.
     85 
     86   Args:
     87     server_type: the server type to be used (e.g. 'http').
     88 
     89   Returns:
     90     A string containing the command-line argument.
     91   """
     92   if server_type not in SERVER_TYPES:
     93     raise NotImplementedError('Unknown server type: %s' % server_type)
     94   if server_type == 'udpecho':
     95     raise Exception('Please do not run UDP echo tests because we do not have '
     96                     'a UDP forwarder tool.')
     97   return SERVER_TYPES[server_type]
     98 
     99 
    100 class TestServerThread(threading.Thread):
    101   """A thread to run the test server in a separate process."""
    102 
    103   def __init__(self, ready_event, arguments, device, tool):
    104     """Initialize TestServerThread with the following argument.
    105 
    106     Args:
    107       ready_event: event which will be set when the test server is ready.
    108       arguments: dictionary of arguments to run the test server.
    109       device: An instance of DeviceUtils.
    110       tool: instance of runtime error detection tool.
    111     """
    112     threading.Thread.__init__(self)
    113     self.wait_event = threading.Event()
    114     self.stop_flag = False
    115     self.ready_event = ready_event
    116     self.ready_event.clear()
    117     self.arguments = arguments
    118     self.device = device
    119     self.tool = tool
    120     self.test_server_process = None
    121     self.is_ready = False
    122     self.host_port = self.arguments['port']
    123     assert isinstance(self.host_port, int)
    124     # The forwarder device port now is dynamically allocated.
    125     self.forwarder_device_port = 0
    126     # Anonymous pipe in order to get port info from test server.
    127     self.pipe_in = None
    128     self.pipe_out = None
    129     self.process = None
    130     self.command_line = []
    131 
    132   def _WaitToStartAndGetPortFromTestServer(self):
    133     """Waits for the Python test server to start and gets the port it is using.
    134 
    135     The port information is passed by the Python test server with a pipe given
    136     by self.pipe_out. It is written as a result to |self.host_port|.
    137 
    138     Returns:
    139       Whether the port used by the test server was successfully fetched.
    140     """
    141     assert self.host_port == 0 and self.pipe_out and self.pipe_in
    142     (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
    143                                    TEST_SERVER_STARTUP_TIMEOUT)
    144     if len(in_fds) == 0:
    145       logging.error('Failed to wait to the Python test server to be started.')
    146       return False
    147     # First read the data length as an unsigned 4-byte value.  This
    148     # is _not_ using network byte ordering since the Python test server packs
    149     # size as native byte order and all Chromium platforms so far are
    150     # configured to use little-endian.
    151     # TODO(jnd): Change the Python test server and local_test_server_*.cc to
    152     # use a unified byte order (either big-endian or little-endian).
    153     data_length = os.read(self.pipe_in, struct.calcsize('=L'))
    154     if data_length:
    155       (data_length,) = struct.unpack('=L', data_length)
    156       assert data_length
    157     if not data_length:
    158       logging.error('Failed to get length of server data.')
    159       return False
    160     port_json = os.read(self.pipe_in, data_length)
    161     if not port_json:
    162       logging.error('Failed to get server data.')
    163       return False
    164     logging.info('Got port json data: %s', port_json)
    165     port_json = json.loads(port_json)
    166     if port_json.has_key('port') and isinstance(port_json['port'], int):
    167       self.host_port = port_json['port']
    168       return _CheckPortNotAvailable(self.host_port)
    169     logging.error('Failed to get port information from the server data.')
    170     return False
    171 
    172   def _GenerateCommandLineArguments(self):
    173     """Generates the command line to run the test server.
    174 
    175     Note that all options are processed by following the definitions in
    176     testserver.py.
    177     """
    178     if self.command_line:
    179       return
    180 
    181     args_copy = dict(self.arguments)
    182 
    183     # Translate the server type.
    184     type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
    185     if type_cmd:
    186       self.command_line.append(type_cmd)
    187 
    188     # Use a pipe to get the port given by the instance of Python test server
    189     # if the test does not specify the port.
    190     assert self.host_port == args_copy['port']
    191     if self.host_port == 0:
    192       (self.pipe_in, self.pipe_out) = os.pipe()
    193       self.command_line.append('--startup-pipe=%d' % self.pipe_out)
    194 
    195     # Pass the remaining arguments as-is.
    196     for key, values in args_copy.iteritems():
    197       if not isinstance(values, list):
    198         values = [values]
    199       for value in values:
    200         if value is None:
    201           self.command_line.append('--%s' % key)
    202         else:
    203           self.command_line.append('--%s=%s' % (key, value))
    204 
    205   def _CloseUnnecessaryFDsForTestServerProcess(self):
    206     # This is required to avoid subtle deadlocks that could be caused by the
    207     # test server child process inheriting undesirable file descriptors such as
    208     # file lock file descriptors.
    209     for fd in xrange(0, 1024):
    210       if fd != self.pipe_out:
    211         try:
    212           os.close(fd)
    213         except:
    214           pass
    215 
    216   def run(self):
    217     logging.info('Start running the thread!')
    218     self.wait_event.clear()
    219     self._GenerateCommandLineArguments()
    220     command = host_paths.DIR_SOURCE_ROOT
    221     if self.arguments['server-type'] == 'sync':
    222       command = [os.path.join(command, 'sync', 'tools', 'testserver',
    223                               'sync_testserver.py')] + self.command_line
    224     else:
    225       command = [os.path.join(command, 'net', 'tools', 'testserver',
    226                               'testserver.py')] + self.command_line
    227     logging.info('Running: %s', command)
    228 
    229     # Disable PYTHONUNBUFFERED because it has a bad interaction with the
    230     # testserver. Remove once this interaction is fixed.
    231     unbuf = os.environ.pop('PYTHONUNBUFFERED', None)
    232 
    233     # Pass DIR_SOURCE_ROOT as the child's working directory so that relative
    234     # paths in the arguments are resolved correctly.
    235     self.process = subprocess.Popen(
    236         command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess,
    237         cwd=host_paths.DIR_SOURCE_ROOT)
    238     if unbuf:
    239       os.environ['PYTHONUNBUFFERED'] = unbuf
    240     if self.process:
    241       if self.pipe_out:
    242         self.is_ready = self._WaitToStartAndGetPortFromTestServer()
    243       else:
    244         self.is_ready = _CheckPortNotAvailable(self.host_port)
    245     if self.is_ready:
    246       forwarder.Forwarder.Map([(0, self.host_port)], self.device, self.tool)
    247       # Check whether the forwarder is ready on the device.
    248       self.is_ready = False
    249       device_port = forwarder.Forwarder.DevicePortForHostPort(self.host_port)
    250       if device_port and _CheckDevicePortStatus(self.device, device_port):
    251         self.is_ready = True
    252         self.forwarder_device_port = device_port
    253     # Wake up the request handler thread.
    254     self.ready_event.set()
    255     # Keep thread running until Stop() gets called.
    256     _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
    257     if self.process.poll() is None:
    258       self.process.kill()
    259     forwarder.Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device)
    260     self.process = None
    261     self.is_ready = False
    262     if self.pipe_out:
    263       os.close(self.pipe_in)
    264       os.close(self.pipe_out)
    265       self.pipe_in = None
    266       self.pipe_out = None
    267     logging.info('Test-server has died.')
    268     self.wait_event.set()
    269 
    270   def Stop(self):
    271     """Blocks until the loop has finished.
    272 
    273     Note that this must be called in another thread.
    274     """
    275     if not self.process:
    276       return
    277     self.stop_flag = True
    278     self.wait_event.wait()
    279 
    280 
    281 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    282   """A handler used to process http GET/POST request."""
    283 
    284   def _SendResponse(self, response_code, response_reason, additional_headers,
    285                     contents):
    286     """Generates a response sent to the client from the provided parameters.
    287 
    288     Args:
    289       response_code: number of the response status.
    290       response_reason: string of reason description of the response.
    291       additional_headers: dict of additional headers. Each key is the name of
    292                           the header, each value is the content of the header.
    293       contents: string of the contents we want to send to client.
    294     """
    295     self.send_response(response_code, response_reason)
    296     self.send_header('Content-Type', 'text/html')
    297     # Specify the content-length as without it the http(s) response will not
    298     # be completed properly (and the browser keeps expecting data).
    299     self.send_header('Content-Length', len(contents))
    300     for header_name in additional_headers:
    301       self.send_header(header_name, additional_headers[header_name])
    302     self.end_headers()
    303     self.wfile.write(contents)
    304     self.wfile.flush()
    305 
    306   def _StartTestServer(self):
    307     """Starts the test server thread."""
    308     logging.info('Handling request to spawn a test server.')
    309     content_type = self.headers.getheader('content-type')
    310     if content_type != 'application/json':
    311       raise Exception('Bad content-type for start request.')
    312     content_length = self.headers.getheader('content-length')
    313     if not content_length:
    314       content_length = 0
    315     try:
    316       content_length = int(content_length)
    317     except:
    318       raise Exception('Bad content-length for start request.')
    319     logging.info(content_length)
    320     test_server_argument_json = self.rfile.read(content_length)
    321     logging.info(test_server_argument_json)
    322     assert not self.server.test_server_instance
    323     ready_event = threading.Event()
    324     self.server.test_server_instance = TestServerThread(
    325         ready_event,
    326         json.loads(test_server_argument_json),
    327         self.server.device,
    328         self.server.tool)
    329     self.server.test_server_instance.setDaemon(True)
    330     self.server.test_server_instance.start()
    331     ready_event.wait()
    332     if self.server.test_server_instance.is_ready:
    333       self._SendResponse(200, 'OK', {}, json.dumps(
    334           {'port': self.server.test_server_instance.forwarder_device_port,
    335            'message': 'started'}))
    336       logging.info('Test server is running on port: %d.',
    337                    self.server.test_server_instance.host_port)
    338     else:
    339       self.server.test_server_instance.Stop()
    340       self.server.test_server_instance = None
    341       self._SendResponse(500, 'Test Server Error.', {}, '')
    342       logging.info('Encounter problem during starting a test server.')
    343 
    344   def _KillTestServer(self):
    345     """Stops the test server instance."""
    346     # There should only ever be one test server at a time. This may do the
    347     # wrong thing if we try and start multiple test servers.
    348     if not self.server.test_server_instance:
    349       return
    350     port = self.server.test_server_instance.host_port
    351     logging.info('Handling request to kill a test server on port: %d.', port)
    352     self.server.test_server_instance.Stop()
    353     # Make sure the status of test server is correct before sending response.
    354     if _CheckPortAvailable(port):
    355       self._SendResponse(200, 'OK', {}, 'killed')
    356       logging.info('Test server on port %d is killed', port)
    357     else:
    358       self._SendResponse(500, 'Test Server Error.', {}, '')
    359       logging.info('Encounter problem during killing a test server.')
    360     self.server.test_server_instance = None
    361 
    362   def do_POST(self):
    363     parsed_path = urlparse.urlparse(self.path)
    364     action = parsed_path.path
    365     logging.info('Action for POST method is: %s.', action)
    366     if action == '/start':
    367       self._StartTestServer()
    368     else:
    369       self._SendResponse(400, 'Unknown request.', {}, '')
    370       logging.info('Encounter unknown request: %s.', action)
    371 
    372   def do_GET(self):
    373     parsed_path = urlparse.urlparse(self.path)
    374     action = parsed_path.path
    375     params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    376     logging.info('Action for GET method is: %s.', action)
    377     for param in params:
    378       logging.info('%s=%s', param, params[param][0])
    379     if action == '/kill':
    380       self._KillTestServer()
    381     elif action == '/ping':
    382       # The ping handler is used to check whether the spawner server is ready
    383       # to serve the requests. We don't need to test the status of the test
    384       # server when handling ping request.
    385       self._SendResponse(200, 'OK', {}, 'ready')
    386       logging.info('Handled ping request and sent response.')
    387     else:
    388       self._SendResponse(400, 'Unknown request', {}, '')
    389       logging.info('Encounter unknown request: %s.', action)
    390 
    391 
    392 class SpawningServer(object):
    393   """The class used to start/stop a http server."""
    394 
    395   def __init__(self, test_server_spawner_port, device, tool):
    396     logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
    397     self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
    398                                             SpawningServerRequestHandler)
    399     self.server.device = device
    400     self.server.tool = tool
    401     self.server.test_server_instance = None
    402     self.server.build_type = constants.GetBuildType()
    403 
    404   def _Listen(self):
    405     logging.info('Starting test server spawner')
    406     self.server.serve_forever()
    407 
    408   def Start(self):
    409     """Starts the test server spawner."""
    410     listener_thread = threading.Thread(target=self._Listen)
    411     listener_thread.setDaemon(True)
    412     listener_thread.start()
    413 
    414   def Stop(self):
    415     """Stops the test server spawner.
    416 
    417     Also cleans the server state.
    418     """
    419     self.CleanupState()
    420     self.server.shutdown()
    421 
    422   def CleanupState(self):
    423     """Cleans up the spawning server state.
    424 
    425     This should be called if the test server spawner is reused,
    426     to avoid sharing the test server instance.
    427     """
    428     if self.server.test_server_instance:
    429       self.server.test_server_instance.Stop()
    430       self.server.test_server_instance = None
    431