Home | History | Annotate | Download | only in pylib
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
      6 
      7 It's used to accept requests from the device to spawn and kill instances of the
      8 chrome test server on the host.
      9 """
     10 
     11 import BaseHTTPServer
     12 import json
     13 import logging
     14 import os
     15 import select
     16 import struct
     17 import subprocess
     18 import sys
     19 import threading
     20 import time
     21 import urlparse
     22 
     23 import constants
     24 import ports
     25 
     26 from pylib.forwarder import Forwarder
     27 
     28 # Path that are needed to import necessary modules when launching a testserver.
     29 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
     30     % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
     31        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
     32        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
     33                     'src'),
     34        os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
     35        os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
     36 
     37 
     38 SERVER_TYPES = {
     39     'http': '',
     40     'ftp': '-f',
     41     'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
     42     'tcpecho': '--tcp-echo',
     43     'udpecho': '--udp-echo',
     44 }
     45 
     46 
     47 # The timeout (in seconds) of starting up the Python test server.
     48 TEST_SERVER_STARTUP_TIMEOUT = 10
     49 
     50 def _WaitUntil(predicate, max_attempts=5):
     51   """Blocks until the provided predicate (function) is true.
     52 
     53   Returns:
     54     Whether the provided predicate was satisfied once (before the timeout).
     55   """
     56   sleep_time_sec = 0.025
     57   for attempt in xrange(1, max_attempts):
     58     if predicate():
     59       return True
     60     time.sleep(sleep_time_sec)
     61     sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
     62   return False
     63 
     64 
     65 def _CheckPortStatus(port, expected_status):
     66   """Returns True if port has expected_status.
     67 
     68   Args:
     69     port: the port number.
     70     expected_status: boolean of expected status.
     71 
     72   Returns:
     73     Returns True if the status is expected. Otherwise returns False.
     74   """
     75   return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status)
     76 
     77 
     78 def _CheckDevicePortStatus(adb, port):
     79   """Returns whether the provided port is used."""
     80   return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port))
     81 
     82 
     83 def _GetServerTypeCommandLine(server_type):
     84   """Returns the command-line by the given server type.
     85 
     86   Args:
     87     server_type: the server type to be used (e.g. 'http').
     88 
     89   Returns:
     90     A string containing the command-line argument.
     91   """
     92   if server_type not in SERVER_TYPES:
     93     raise NotImplementedError('Unknown server type: %s' % server_type)
     94   if server_type == 'udpecho':
     95     raise Exception('Please do not run UDP echo tests because we do not have '
     96                     'a UDP forwarder tool.')
     97   return SERVER_TYPES[server_type]
     98 
     99 
    100 class TestServerThread(threading.Thread):
    101   """A thread to run the test server in a separate process."""
    102 
    103   def __init__(self, ready_event, arguments, adb, tool):
    104     """Initialize TestServerThread with the following argument.
    105 
    106     Args:
    107       ready_event: event which will be set when the test server is ready.
    108       arguments: dictionary of arguments to run the test server.
    109       adb: instance of AndroidCommands.
    110       tool: instance of runtime error detection tool.
    111     """
    112     threading.Thread.__init__(self)
    113     self.wait_event = threading.Event()
    114     self.stop_flag = False
    115     self.ready_event = ready_event
    116     self.ready_event.clear()
    117     self.arguments = arguments
    118     self.adb = adb
    119     self.tool = tool
    120     self.test_server_process = None
    121     self.is_ready = False
    122     self.host_port = self.arguments['port']
    123     assert isinstance(self.host_port, int)
    124     # The forwarder device port now is dynamically allocated.
    125     self.forwarder_device_port = 0
    126     # Anonymous pipe in order to get port info from test server.
    127     self.pipe_in = None
    128     self.pipe_out = None
    129     self.command_line = []
    130 
    131   def _WaitToStartAndGetPortFromTestServer(self):
    132     """Waits for the Python test server to start and gets the port it is using.
    133 
    134     The port information is passed by the Python test server with a pipe given
    135     by self.pipe_out. It is written as a result to |self.host_port|.
    136 
    137     Returns:
    138       Whether the port used by the test server was successfully fetched.
    139     """
    140     assert self.host_port == 0 and self.pipe_out and self.pipe_in
    141     (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
    142                                    TEST_SERVER_STARTUP_TIMEOUT)
    143     if len(in_fds) == 0:
    144       logging.error('Failed to wait to the Python test server to be started.')
    145       return False
    146     # First read the data length as an unsigned 4-byte value.  This
    147     # is _not_ using network byte ordering since the Python test server packs
    148     # size as native byte order and all Chromium platforms so far are
    149     # configured to use little-endian.
    150     # TODO(jnd): Change the Python test server and local_test_server_*.cc to
    151     # use a unified byte order (either big-endian or little-endian).
    152     data_length = os.read(self.pipe_in, struct.calcsize('=L'))
    153     if data_length:
    154       (data_length,) = struct.unpack('=L', data_length)
    155       assert data_length
    156     if not data_length:
    157       logging.error('Failed to get length of server data.')
    158       return False
    159     port_json = os.read(self.pipe_in, data_length)
    160     if not port_json:
    161       logging.error('Failed to get server data.')
    162       return False
    163     logging.info('Got port json data: %s', port_json)
    164     port_json = json.loads(port_json)
    165     if port_json.has_key('port') and isinstance(port_json['port'], int):
    166       self.host_port = port_json['port']
    167       return _CheckPortStatus(self.host_port, True)
    168     logging.error('Failed to get port information from the server data.')
    169     return False
    170 
    171   def _GenerateCommandLineArguments(self):
    172     """Generates the command line to run the test server.
    173 
    174     Note that all options are processed by following the definitions in
    175     testserver.py.
    176     """
    177     if self.command_line:
    178       return
    179     # The following arguments must exist.
    180     type_cmd = _GetServerTypeCommandLine(self.arguments['server-type'])
    181     if type_cmd:
    182       self.command_line.append(type_cmd)
    183     self.command_line.append('--port=%d' % self.host_port)
    184     # Use a pipe to get the port given by the instance of Python test server
    185     # if the test does not specify the port.
    186     if self.host_port == 0:
    187       (self.pipe_in, self.pipe_out) = os.pipe()
    188       self.command_line.append('--startup-pipe=%d' % self.pipe_out)
    189     self.command_line.append('--host=%s' % self.arguments['host'])
    190     data_dir = self.arguments['data-dir'] or 'chrome/test/data'
    191     if not os.path.isabs(data_dir):
    192       data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir)
    193     self.command_line.append('--data-dir=%s' % data_dir)
    194     # The following arguments are optional depending on the individual test.
    195     if self.arguments.has_key('log-to-console'):
    196       self.command_line.append('--log-to-console')
    197     if self.arguments.has_key('auth-token'):
    198       self.command_line.append('--auth-token=%s' % self.arguments['auth-token'])
    199     if self.arguments.has_key('https'):
    200       self.command_line.append('--https')
    201       if self.arguments.has_key('cert-and-key-file'):
    202         self.command_line.append('--cert-and-key-file=%s' % os.path.join(
    203             constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file']))
    204       if self.arguments.has_key('ocsp'):
    205         self.command_line.append('--ocsp=%s' % self.arguments['ocsp'])
    206       if self.arguments.has_key('https-record-resume'):
    207         self.command_line.append('--https-record-resume')
    208       if self.arguments.has_key('ssl-client-auth'):
    209         self.command_line.append('--ssl-client-auth')
    210       if self.arguments.has_key('tls-intolerant'):
    211         self.command_line.append('--tls-intolerant=%s' %
    212                                  self.arguments['tls-intolerant'])
    213       if self.arguments.has_key('ssl-client-ca'):
    214         for ca in self.arguments['ssl-client-ca']:
    215           self.command_line.append('--ssl-client-ca=%s' %
    216                                    os.path.join(constants.DIR_SOURCE_ROOT, ca))
    217       if self.arguments.has_key('ssl-bulk-cipher'):
    218         for bulk_cipher in self.arguments['ssl-bulk-cipher']:
    219           self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher)
    220 
    221   def _CloseUnnecessaryFDsForTestServerProcess(self):
    222     # This is required to avoid subtle deadlocks that could be caused by the
    223     # test server child process inheriting undesirable file descriptors such as
    224     # file lock file descriptors.
    225     for fd in xrange(0, 1024):
    226       if fd != self.pipe_out:
    227         try:
    228           os.close(fd)
    229         except:
    230           pass
    231 
    232   def run(self):
    233     logging.info('Start running the thread!')
    234     self.wait_event.clear()
    235     self._GenerateCommandLineArguments()
    236     command = constants.DIR_SOURCE_ROOT
    237     if self.arguments['server-type'] == 'sync':
    238       command = [os.path.join(command, 'sync', 'tools', 'testserver',
    239                               'sync_testserver.py')] + self.command_line
    240     else:
    241       command = [os.path.join(command, 'net', 'tools', 'testserver',
    242                               'testserver.py')] + self.command_line
    243     logging.info('Running: %s', command)
    244     self.process = subprocess.Popen(
    245         command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess)
    246     if self.process:
    247       if self.pipe_out:
    248         self.is_ready = self._WaitToStartAndGetPortFromTestServer()
    249       else:
    250         self.is_ready = _CheckPortStatus(self.host_port, True)
    251     if self.is_ready:
    252       Forwarder.Map([(0, self.host_port)], self.adb, self.tool)
    253       # Check whether the forwarder is ready on the device.
    254       self.is_ready = False
    255       device_port = Forwarder.DevicePortForHostPort(self.host_port)
    256       if device_port and _CheckDevicePortStatus(self.adb, device_port):
    257         self.is_ready = True
    258         self.forwarder_device_port = device_port
    259     # Wake up the request handler thread.
    260     self.ready_event.set()
    261     # Keep thread running until Stop() gets called.
    262     _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
    263     if self.process.poll() is None:
    264       self.process.kill()
    265     Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb)
    266     self.process = None
    267     self.is_ready = False
    268     if self.pipe_out:
    269       os.close(self.pipe_in)
    270       os.close(self.pipe_out)
    271       self.pipe_in = None
    272       self.pipe_out = None
    273     logging.info('Test-server has died.')
    274     self.wait_event.set()
    275 
    276   def Stop(self):
    277     """Blocks until the loop has finished.
    278 
    279     Note that this must be called in another thread.
    280     """
    281     if not self.process:
    282       return
    283     self.stop_flag = True
    284     self.wait_event.wait()
    285 
    286 
    287 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    288   """A handler used to process http GET/POST request."""
    289 
    290   def _SendResponse(self, response_code, response_reason, additional_headers,
    291                     contents):
    292     """Generates a response sent to the client from the provided parameters.
    293 
    294     Args:
    295       response_code: number of the response status.
    296       response_reason: string of reason description of the response.
    297       additional_headers: dict of additional headers. Each key is the name of
    298                           the header, each value is the content of the header.
    299       contents: string of the contents we want to send to client.
    300     """
    301     self.send_response(response_code, response_reason)
    302     self.send_header('Content-Type', 'text/html')
    303     # Specify the content-length as without it the http(s) response will not
    304     # be completed properly (and the browser keeps expecting data).
    305     self.send_header('Content-Length', len(contents))
    306     for header_name in additional_headers:
    307       self.send_header(header_name, additional_headers[header_name])
    308     self.end_headers()
    309     self.wfile.write(contents)
    310     self.wfile.flush()
    311 
    312   def _StartTestServer(self):
    313     """Starts the test server thread."""
    314     logging.info('Handling request to spawn a test server.')
    315     content_type = self.headers.getheader('content-type')
    316     if content_type != 'application/json':
    317       raise Exception('Bad content-type for start request.')
    318     content_length = self.headers.getheader('content-length')
    319     if not content_length:
    320       content_length = 0
    321     try:
    322       content_length = int(content_length)
    323     except:
    324       raise Exception('Bad content-length for start request.')
    325     logging.info(content_length)
    326     test_server_argument_json = self.rfile.read(content_length)
    327     logging.info(test_server_argument_json)
    328     assert not self.server.test_server_instance
    329     ready_event = threading.Event()
    330     self.server.test_server_instance = TestServerThread(
    331         ready_event,
    332         json.loads(test_server_argument_json),
    333         self.server.adb,
    334         self.server.tool)
    335     self.server.test_server_instance.setDaemon(True)
    336     self.server.test_server_instance.start()
    337     ready_event.wait()
    338     if self.server.test_server_instance.is_ready:
    339       self._SendResponse(200, 'OK', {}, json.dumps(
    340           {'port': self.server.test_server_instance.forwarder_device_port,
    341            'message': 'started'}))
    342       logging.info('Test server is running on port: %d.',
    343                    self.server.test_server_instance.host_port)
    344     else:
    345       self.server.test_server_instance.Stop()
    346       self.server.test_server_instance = None
    347       self._SendResponse(500, 'Test Server Error.', {}, '')
    348       logging.info('Encounter problem during starting a test server.')
    349 
    350   def _KillTestServer(self):
    351     """Stops the test server instance."""
    352     # There should only ever be one test server at a time. This may do the
    353     # wrong thing if we try and start multiple test servers.
    354     if not self.server.test_server_instance:
    355       return
    356     port = self.server.test_server_instance.host_port
    357     logging.info('Handling request to kill a test server on port: %d.', port)
    358     self.server.test_server_instance.Stop()
    359     # Make sure the status of test server is correct before sending response.
    360     if _CheckPortStatus(port, False):
    361       self._SendResponse(200, 'OK', {}, 'killed')
    362       logging.info('Test server on port %d is killed', port)
    363     else:
    364       self._SendResponse(500, 'Test Server Error.', {}, '')
    365       logging.info('Encounter problem during killing a test server.')
    366     self.server.test_server_instance = None
    367 
    368   def do_POST(self):
    369     parsed_path = urlparse.urlparse(self.path)
    370     action = parsed_path.path
    371     logging.info('Action for POST method is: %s.', action)
    372     if action == '/start':
    373       self._StartTestServer()
    374     else:
    375       self._SendResponse(400, 'Unknown request.', {}, '')
    376       logging.info('Encounter unknown request: %s.', action)
    377 
    378   def do_GET(self):
    379     parsed_path = urlparse.urlparse(self.path)
    380     action = parsed_path.path
    381     params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    382     logging.info('Action for GET method is: %s.', action)
    383     for param in params:
    384       logging.info('%s=%s', param, params[param][0])
    385     if action == '/kill':
    386       self._KillTestServer()
    387     elif action == '/ping':
    388       # The ping handler is used to check whether the spawner server is ready
    389       # to serve the requests. We don't need to test the status of the test
    390       # server when handling ping request.
    391       self._SendResponse(200, 'OK', {}, 'ready')
    392       logging.info('Handled ping request and sent response.')
    393     else:
    394       self._SendResponse(400, 'Unknown request', {}, '')
    395       logging.info('Encounter unknown request: %s.', action)
    396 
    397 
    398 class SpawningServer(object):
    399   """The class used to start/stop a http server."""
    400 
    401   def __init__(self, test_server_spawner_port, adb, tool):
    402     logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
    403     self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
    404                                             SpawningServerRequestHandler)
    405     self.server.adb = adb
    406     self.server.tool = tool
    407     self.server.test_server_instance = None
    408     self.server.build_type = constants.GetBuildType()
    409 
    410   def _Listen(self):
    411     logging.info('Starting test server spawner')
    412     self.server.serve_forever()
    413 
    414   def Start(self):
    415     """Starts the test server spawner."""
    416     listener_thread = threading.Thread(target=self._Listen)
    417     listener_thread.setDaemon(True)
    418     listener_thread.start()
    419 
    420   def Stop(self):
    421     """Stops the test server spawner.
    422 
    423     Also cleans the server state.
    424     """
    425     self.CleanupState()
    426     self.server.shutdown()
    427 
    428   def CleanupState(self):
    429     """Cleans up the spawning server state.
    430 
    431     This should be called if the test server spawner is reused,
    432     to avoid sharing the test server instance.
    433     """
    434     if self.server.test_server_instance:
    435       self.server.test_server_instance.Stop()
    436       self.server.test_server_instance = None
    437