Home | History | Annotate | Download | only in pylib
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
      6 
      7 It's used to accept requests from the device to spawn and kill instances of the
      8 chrome test server on the host.
      9 """
     10 
     11 import BaseHTTPServer
     12 import json
     13 import logging
     14 import os
     15 import select
     16 import struct
     17 import subprocess
     18 import sys
     19 import threading
     20 import time
     21 import urlparse
     22 
     23 import constants
     24 import ports
     25 
     26 from pylib.forwarder import Forwarder
     27 
     28 # Path that are needed to import necessary modules when launching a testserver.
     29 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
     30     % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
     31        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
     32        os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
     33                     'src'),
     34        os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
     35        os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
     36 
     37 
     38 SERVER_TYPES = {
     39     'http': '',
     40     'ftp': '-f',
     41     'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
     42     'tcpecho': '--tcp-echo',
     43     'udpecho': '--udp-echo',
     44 }
     45 
     46 
     47 # The timeout (in seconds) of starting up the Python test server.
     48 TEST_SERVER_STARTUP_TIMEOUT = 10
     49 
     50 def _WaitUntil(predicate, max_attempts=5):
     51   """Blocks until the provided predicate (function) is true.
     52 
     53   Returns:
     54     Whether the provided predicate was satisfied once (before the timeout).
     55   """
     56   sleep_time_sec = 0.025
     57   for attempt in xrange(1, max_attempts):
     58     if predicate():
     59       return True
     60     time.sleep(sleep_time_sec)
     61     sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
     62   return False
     63 
     64 
     65 def _CheckPortStatus(port, expected_status):
     66   """Returns True if port has expected_status.
     67 
     68   Args:
     69     port: the port number.
     70     expected_status: boolean of expected status.
     71 
     72   Returns:
     73     Returns True if the status is expected. Otherwise returns False.
     74   """
     75   return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status)
     76 
     77 
     78 def _CheckDevicePortStatus(adb, port):
     79   """Returns whether the provided port is used."""
     80   return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port))
     81 
     82 
     83 def _GetServerTypeCommandLine(server_type):
     84   """Returns the command-line by the given server type.
     85 
     86   Args:
     87     server_type: the server type to be used (e.g. 'http').
     88 
     89   Returns:
     90     A string containing the command-line argument.
     91   """
     92   if server_type not in SERVER_TYPES:
     93     raise NotImplementedError('Unknown server type: %s' % server_type)
     94   if server_type == 'udpecho':
     95     raise Exception('Please do not run UDP echo tests because we do not have '
     96                     'a UDP forwarder tool.')
     97   return SERVER_TYPES[server_type]
     98 
     99 
    100 class TestServerThread(threading.Thread):
    101   """A thread to run the test server in a separate process."""
    102 
    103   def __init__(self, ready_event, arguments, adb, tool, build_type):
    104     """Initialize TestServerThread with the following argument.
    105 
    106     Args:
    107       ready_event: event which will be set when the test server is ready.
    108       arguments: dictionary of arguments to run the test server.
    109       adb: instance of AndroidCommands.
    110       tool: instance of runtime error detection tool.
    111       build_type: 'Release' or 'Debug'.
    112     """
    113     threading.Thread.__init__(self)
    114     self.wait_event = threading.Event()
    115     self.stop_flag = False
    116     self.ready_event = ready_event
    117     self.ready_event.clear()
    118     self.arguments = arguments
    119     self.adb = adb
    120     self.tool = tool
    121     self.test_server_process = None
    122     self.is_ready = False
    123     self.host_port = self.arguments['port']
    124     assert isinstance(self.host_port, int)
    125     # The forwarder device port now is dynamically allocated.
    126     self.forwarder_device_port = 0
    127     # Anonymous pipe in order to get port info from test server.
    128     self.pipe_in = None
    129     self.pipe_out = None
    130     self.command_line = []
    131     self.build_type = build_type
    132 
    133   def _WaitToStartAndGetPortFromTestServer(self):
    134     """Waits for the Python test server to start and gets the port it is using.
    135 
    136     The port information is passed by the Python test server with a pipe given
    137     by self.pipe_out. It is written as a result to |self.host_port|.
    138 
    139     Returns:
    140       Whether the port used by the test server was successfully fetched.
    141     """
    142     assert self.host_port == 0 and self.pipe_out and self.pipe_in
    143     (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
    144                                    TEST_SERVER_STARTUP_TIMEOUT)
    145     if len(in_fds) == 0:
    146       logging.error('Failed to wait to the Python test server to be started.')
    147       return False
    148     # First read the data length as an unsigned 4-byte value.  This
    149     # is _not_ using network byte ordering since the Python test server packs
    150     # size as native byte order and all Chromium platforms so far are
    151     # configured to use little-endian.
    152     # TODO(jnd): Change the Python test server and local_test_server_*.cc to
    153     # use a unified byte order (either big-endian or little-endian).
    154     data_length = os.read(self.pipe_in, struct.calcsize('=L'))
    155     if data_length:
    156       (data_length,) = struct.unpack('=L', data_length)
    157       assert data_length
    158     if not data_length:
    159       logging.error('Failed to get length of server data.')
    160       return False
    161     port_json = os.read(self.pipe_in, data_length)
    162     if not port_json:
    163       logging.error('Failed to get server data.')
    164       return False
    165     logging.info('Got port json data: %s', port_json)
    166     port_json = json.loads(port_json)
    167     if port_json.has_key('port') and isinstance(port_json['port'], int):
    168       self.host_port = port_json['port']
    169       return _CheckPortStatus(self.host_port, True)
    170     logging.error('Failed to get port information from the server data.')
    171     return False
    172 
    173   def _GenerateCommandLineArguments(self):
    174     """Generates the command line to run the test server.
    175 
    176     Note that all options are processed by following the definitions in
    177     testserver.py.
    178     """
    179     if self.command_line:
    180       return
    181     # The following arguments must exist.
    182     type_cmd = _GetServerTypeCommandLine(self.arguments['server-type'])
    183     if type_cmd:
    184       self.command_line.append(type_cmd)
    185     self.command_line.append('--port=%d' % self.host_port)
    186     # Use a pipe to get the port given by the instance of Python test server
    187     # if the test does not specify the port.
    188     if self.host_port == 0:
    189       (self.pipe_in, self.pipe_out) = os.pipe()
    190       self.command_line.append('--startup-pipe=%d' % self.pipe_out)
    191     self.command_line.append('--host=%s' % self.arguments['host'])
    192     data_dir = self.arguments['data-dir'] or 'chrome/test/data'
    193     if not os.path.isabs(data_dir):
    194       data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir)
    195     self.command_line.append('--data-dir=%s' % data_dir)
    196     # The following arguments are optional depending on the individual test.
    197     if self.arguments.has_key('log-to-console'):
    198       self.command_line.append('--log-to-console')
    199     if self.arguments.has_key('auth-token'):
    200       self.command_line.append('--auth-token=%s' % self.arguments['auth-token'])
    201     if self.arguments.has_key('https'):
    202       self.command_line.append('--https')
    203       if self.arguments.has_key('cert-and-key-file'):
    204         self.command_line.append('--cert-and-key-file=%s' % os.path.join(
    205             constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file']))
    206       if self.arguments.has_key('ocsp'):
    207         self.command_line.append('--ocsp=%s' % self.arguments['ocsp'])
    208       if self.arguments.has_key('https-record-resume'):
    209         self.command_line.append('--https-record-resume')
    210       if self.arguments.has_key('ssl-client-auth'):
    211         self.command_line.append('--ssl-client-auth')
    212       if self.arguments.has_key('tls-intolerant'):
    213         self.command_line.append('--tls-intolerant=%s' %
    214                                  self.arguments['tls-intolerant'])
    215       if self.arguments.has_key('ssl-client-ca'):
    216         for ca in self.arguments['ssl-client-ca']:
    217           self.command_line.append('--ssl-client-ca=%s' %
    218                                    os.path.join(constants.DIR_SOURCE_ROOT, ca))
    219       if self.arguments.has_key('ssl-bulk-cipher'):
    220         for bulk_cipher in self.arguments['ssl-bulk-cipher']:
    221           self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher)
    222 
    223   def _CloseUnnecessaryFDsForTestServerProcess(self):
    224     # This is required to avoid subtle deadlocks that could be caused by the
    225     # test server child process inheriting undesirable file descriptors such as
    226     # file lock file descriptors.
    227     for fd in xrange(0, 1024):
    228       if fd != self.pipe_out:
    229         try:
    230           os.close(fd)
    231         except:
    232           pass
    233 
    234   def run(self):
    235     logging.info('Start running the thread!')
    236     self.wait_event.clear()
    237     self._GenerateCommandLineArguments()
    238     command = constants.DIR_SOURCE_ROOT
    239     if self.arguments['server-type'] == 'sync':
    240       command = [os.path.join(command, 'sync', 'tools', 'testserver',
    241                               'sync_testserver.py')] + self.command_line
    242     else:
    243       command = [os.path.join(command, 'net', 'tools', 'testserver',
    244                               'testserver.py')] + self.command_line
    245     logging.info('Running: %s', command)
    246     self.process = subprocess.Popen(
    247         command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess)
    248     if self.process:
    249       if self.pipe_out:
    250         self.is_ready = self._WaitToStartAndGetPortFromTestServer()
    251       else:
    252         self.is_ready = _CheckPortStatus(self.host_port, True)
    253     if self.is_ready:
    254       Forwarder.Map([(0, self.host_port)], self.adb, self.build_type, self.tool)
    255       # Check whether the forwarder is ready on the device.
    256       self.is_ready = False
    257       device_port = Forwarder.DevicePortForHostPort(self.host_port)
    258       if device_port and _CheckDevicePortStatus(self.adb, device_port):
    259         self.is_ready = True
    260         self.forwarder_device_port = device_port
    261     # Wake up the request handler thread.
    262     self.ready_event.set()
    263     # Keep thread running until Stop() gets called.
    264     _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
    265     if self.process.poll() is None:
    266       self.process.kill()
    267     Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb)
    268     self.process = None
    269     self.is_ready = False
    270     if self.pipe_out:
    271       os.close(self.pipe_in)
    272       os.close(self.pipe_out)
    273       self.pipe_in = None
    274       self.pipe_out = None
    275     logging.info('Test-server has died.')
    276     self.wait_event.set()
    277 
    278   def Stop(self):
    279     """Blocks until the loop has finished.
    280 
    281     Note that this must be called in another thread.
    282     """
    283     if not self.process:
    284       return
    285     self.stop_flag = True
    286     self.wait_event.wait()
    287 
    288 
    289 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    290   """A handler used to process http GET/POST request."""
    291 
    292   def _SendResponse(self, response_code, response_reason, additional_headers,
    293                     contents):
    294     """Generates a response sent to the client from the provided parameters.
    295 
    296     Args:
    297       response_code: number of the response status.
    298       response_reason: string of reason description of the response.
    299       additional_headers: dict of additional headers. Each key is the name of
    300                           the header, each value is the content of the header.
    301       contents: string of the contents we want to send to client.
    302     """
    303     self.send_response(response_code, response_reason)
    304     self.send_header('Content-Type', 'text/html')
    305     # Specify the content-length as without it the http(s) response will not
    306     # be completed properly (and the browser keeps expecting data).
    307     self.send_header('Content-Length', len(contents))
    308     for header_name in additional_headers:
    309       self.send_header(header_name, additional_headers[header_name])
    310     self.end_headers()
    311     self.wfile.write(contents)
    312     self.wfile.flush()
    313 
    314   def _StartTestServer(self):
    315     """Starts the test server thread."""
    316     logging.info('Handling request to spawn a test server.')
    317     content_type = self.headers.getheader('content-type')
    318     if content_type != 'application/json':
    319       raise Exception('Bad content-type for start request.')
    320     content_length = self.headers.getheader('content-length')
    321     if not content_length:
    322       content_length = 0
    323     try:
    324       content_length = int(content_length)
    325     except:
    326       raise Exception('Bad content-length for start request.')
    327     logging.info(content_length)
    328     test_server_argument_json = self.rfile.read(content_length)
    329     logging.info(test_server_argument_json)
    330     assert not self.server.test_server_instance
    331     ready_event = threading.Event()
    332     self.server.test_server_instance = TestServerThread(
    333         ready_event,
    334         json.loads(test_server_argument_json),
    335         self.server.adb,
    336         self.server.tool,
    337         self.server.build_type)
    338     self.server.test_server_instance.setDaemon(True)
    339     self.server.test_server_instance.start()
    340     ready_event.wait()
    341     if self.server.test_server_instance.is_ready:
    342       self._SendResponse(200, 'OK', {}, json.dumps(
    343           {'port': self.server.test_server_instance.forwarder_device_port,
    344            'message': 'started'}))
    345       logging.info('Test server is running on port: %d.',
    346                    self.server.test_server_instance.host_port)
    347     else:
    348       self.server.test_server_instance.Stop()
    349       self.server.test_server_instance = None
    350       self._SendResponse(500, 'Test Server Error.', {}, '')
    351       logging.info('Encounter problem during starting a test server.')
    352 
    353   def _KillTestServer(self):
    354     """Stops the test server instance."""
    355     # There should only ever be one test server at a time. This may do the
    356     # wrong thing if we try and start multiple test servers.
    357     if not self.server.test_server_instance:
    358       return
    359     port = self.server.test_server_instance.host_port
    360     logging.info('Handling request to kill a test server on port: %d.', port)
    361     self.server.test_server_instance.Stop()
    362     # Make sure the status of test server is correct before sending response.
    363     if _CheckPortStatus(port, False):
    364       self._SendResponse(200, 'OK', {}, 'killed')
    365       logging.info('Test server on port %d is killed', port)
    366     else:
    367       self._SendResponse(500, 'Test Server Error.', {}, '')
    368       logging.info('Encounter problem during killing a test server.')
    369     self.server.test_server_instance = None
    370 
    371   def do_POST(self):
    372     parsed_path = urlparse.urlparse(self.path)
    373     action = parsed_path.path
    374     logging.info('Action for POST method is: %s.', action)
    375     if action == '/start':
    376       self._StartTestServer()
    377     else:
    378       self._SendResponse(400, 'Unknown request.', {}, '')
    379       logging.info('Encounter unknown request: %s.', action)
    380 
    381   def do_GET(self):
    382     parsed_path = urlparse.urlparse(self.path)
    383     action = parsed_path.path
    384     params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    385     logging.info('Action for GET method is: %s.', action)
    386     for param in params:
    387       logging.info('%s=%s', param, params[param][0])
    388     if action == '/kill':
    389       self._KillTestServer()
    390     elif action == '/ping':
    391       # The ping handler is used to check whether the spawner server is ready
    392       # to serve the requests. We don't need to test the status of the test
    393       # server when handling ping request.
    394       self._SendResponse(200, 'OK', {}, 'ready')
    395       logging.info('Handled ping request and sent response.')
    396     else:
    397       self._SendResponse(400, 'Unknown request', {}, '')
    398       logging.info('Encounter unknown request: %s.', action)
    399 
    400 
    401 class SpawningServer(object):
    402   """The class used to start/stop a http server."""
    403 
    404   def __init__(self, test_server_spawner_port, adb, tool, build_type):
    405     logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
    406     self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
    407                                             SpawningServerRequestHandler)
    408     self.server.adb = adb
    409     self.server.tool = tool
    410     self.server.test_server_instance = None
    411     self.server.build_type = build_type
    412 
    413   def _Listen(self):
    414     logging.info('Starting test server spawner')
    415     self.server.serve_forever()
    416 
    417   def Start(self):
    418     """Starts the test server spawner."""
    419     listener_thread = threading.Thread(target=self._Listen)
    420     listener_thread.setDaemon(True)
    421     listener_thread.start()
    422 
    423   def Stop(self):
    424     """Stops the test server spawner.
    425 
    426     Also cleans the server state.
    427     """
    428     self.CleanupState()
    429     self.server.shutdown()
    430 
    431   def CleanupState(self):
    432     """Cleans up the spawning server state.
    433 
    434     This should be called if the test server spawner is reused,
    435     to avoid sharing the test server instance.
    436     """
    437     if self.server.test_server_instance:
    438       self.server.test_server_instance.Stop()
    439       self.server.test_server_instance = None
    440