1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """A "Test Server Spawner" that handles killing/stopping per-test test servers. 6 7 It's used to accept requests from the device to spawn and kill instances of the 8 chrome test server on the host. 9 """ 10 11 import BaseHTTPServer 12 import json 13 import logging 14 import os 15 import select 16 import struct 17 import subprocess 18 import sys 19 import threading 20 import time 21 import urlparse 22 23 import constants 24 import ports 25 26 from pylib.forwarder import Forwarder 27 28 # Path that are needed to import necessary modules when launching a testserver. 29 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s' 30 % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'), 31 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'), 32 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib', 33 'src'), 34 os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'), 35 os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver'))) 36 37 38 SERVER_TYPES = { 39 'http': '', 40 'ftp': '-f', 41 'sync': '', # Sync uses its own script, and doesn't take a server type arg. 42 'tcpecho': '--tcp-echo', 43 'udpecho': '--udp-echo', 44 } 45 46 47 # The timeout (in seconds) of starting up the Python test server. 48 TEST_SERVER_STARTUP_TIMEOUT = 10 49 50 def _WaitUntil(predicate, max_attempts=5): 51 """Blocks until the provided predicate (function) is true. 52 53 Returns: 54 Whether the provided predicate was satisfied once (before the timeout). 55 """ 56 sleep_time_sec = 0.025 57 for attempt in xrange(1, max_attempts): 58 if predicate(): 59 return True 60 time.sleep(sleep_time_sec) 61 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec. 62 return False 63 64 65 def _CheckPortStatus(port, expected_status): 66 """Returns True if port has expected_status. 67 68 Args: 69 port: the port number. 70 expected_status: boolean of expected status. 71 72 Returns: 73 Returns True if the status is expected. Otherwise returns False. 74 """ 75 return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status) 76 77 78 def _CheckDevicePortStatus(adb, port): 79 """Returns whether the provided port is used.""" 80 return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port)) 81 82 83 def _GetServerTypeCommandLine(server_type): 84 """Returns the command-line by the given server type. 85 86 Args: 87 server_type: the server type to be used (e.g. 'http'). 88 89 Returns: 90 A string containing the command-line argument. 91 """ 92 if server_type not in SERVER_TYPES: 93 raise NotImplementedError('Unknown server type: %s' % server_type) 94 if server_type == 'udpecho': 95 raise Exception('Please do not run UDP echo tests because we do not have ' 96 'a UDP forwarder tool.') 97 return SERVER_TYPES[server_type] 98 99 100 class TestServerThread(threading.Thread): 101 """A thread to run the test server in a separate process.""" 102 103 def __init__(self, ready_event, arguments, adb, tool, build_type): 104 """Initialize TestServerThread with the following argument. 105 106 Args: 107 ready_event: event which will be set when the test server is ready. 108 arguments: dictionary of arguments to run the test server. 109 adb: instance of AndroidCommands. 110 tool: instance of runtime error detection tool. 111 build_type: 'Release' or 'Debug'. 112 """ 113 threading.Thread.__init__(self) 114 self.wait_event = threading.Event() 115 self.stop_flag = False 116 self.ready_event = ready_event 117 self.ready_event.clear() 118 self.arguments = arguments 119 self.adb = adb 120 self.tool = tool 121 self.test_server_process = None 122 self.is_ready = False 123 self.host_port = self.arguments['port'] 124 assert isinstance(self.host_port, int) 125 # The forwarder device port now is dynamically allocated. 126 self.forwarder_device_port = 0 127 # Anonymous pipe in order to get port info from test server. 128 self.pipe_in = None 129 self.pipe_out = None 130 self.command_line = [] 131 self.build_type = build_type 132 133 def _WaitToStartAndGetPortFromTestServer(self): 134 """Waits for the Python test server to start and gets the port it is using. 135 136 The port information is passed by the Python test server with a pipe given 137 by self.pipe_out. It is written as a result to |self.host_port|. 138 139 Returns: 140 Whether the port used by the test server was successfully fetched. 141 """ 142 assert self.host_port == 0 and self.pipe_out and self.pipe_in 143 (in_fds, _, _) = select.select([self.pipe_in, ], [], [], 144 TEST_SERVER_STARTUP_TIMEOUT) 145 if len(in_fds) == 0: 146 logging.error('Failed to wait to the Python test server to be started.') 147 return False 148 # First read the data length as an unsigned 4-byte value. This 149 # is _not_ using network byte ordering since the Python test server packs 150 # size as native byte order and all Chromium platforms so far are 151 # configured to use little-endian. 152 # TODO(jnd): Change the Python test server and local_test_server_*.cc to 153 # use a unified byte order (either big-endian or little-endian). 154 data_length = os.read(self.pipe_in, struct.calcsize('=L')) 155 if data_length: 156 (data_length,) = struct.unpack('=L', data_length) 157 assert data_length 158 if not data_length: 159 logging.error('Failed to get length of server data.') 160 return False 161 port_json = os.read(self.pipe_in, data_length) 162 if not port_json: 163 logging.error('Failed to get server data.') 164 return False 165 logging.info('Got port json data: %s', port_json) 166 port_json = json.loads(port_json) 167 if port_json.has_key('port') and isinstance(port_json['port'], int): 168 self.host_port = port_json['port'] 169 return _CheckPortStatus(self.host_port, True) 170 logging.error('Failed to get port information from the server data.') 171 return False 172 173 def _GenerateCommandLineArguments(self): 174 """Generates the command line to run the test server. 175 176 Note that all options are processed by following the definitions in 177 testserver.py. 178 """ 179 if self.command_line: 180 return 181 # The following arguments must exist. 182 type_cmd = _GetServerTypeCommandLine(self.arguments['server-type']) 183 if type_cmd: 184 self.command_line.append(type_cmd) 185 self.command_line.append('--port=%d' % self.host_port) 186 # Use a pipe to get the port given by the instance of Python test server 187 # if the test does not specify the port. 188 if self.host_port == 0: 189 (self.pipe_in, self.pipe_out) = os.pipe() 190 self.command_line.append('--startup-pipe=%d' % self.pipe_out) 191 self.command_line.append('--host=%s' % self.arguments['host']) 192 data_dir = self.arguments['data-dir'] or 'chrome/test/data' 193 if not os.path.isabs(data_dir): 194 data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir) 195 self.command_line.append('--data-dir=%s' % data_dir) 196 # The following arguments are optional depending on the individual test. 197 if self.arguments.has_key('log-to-console'): 198 self.command_line.append('--log-to-console') 199 if self.arguments.has_key('auth-token'): 200 self.command_line.append('--auth-token=%s' % self.arguments['auth-token']) 201 if self.arguments.has_key('https'): 202 self.command_line.append('--https') 203 if self.arguments.has_key('cert-and-key-file'): 204 self.command_line.append('--cert-and-key-file=%s' % os.path.join( 205 constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file'])) 206 if self.arguments.has_key('ocsp'): 207 self.command_line.append('--ocsp=%s' % self.arguments['ocsp']) 208 if self.arguments.has_key('https-record-resume'): 209 self.command_line.append('--https-record-resume') 210 if self.arguments.has_key('ssl-client-auth'): 211 self.command_line.append('--ssl-client-auth') 212 if self.arguments.has_key('tls-intolerant'): 213 self.command_line.append('--tls-intolerant=%s' % 214 self.arguments['tls-intolerant']) 215 if self.arguments.has_key('ssl-client-ca'): 216 for ca in self.arguments['ssl-client-ca']: 217 self.command_line.append('--ssl-client-ca=%s' % 218 os.path.join(constants.DIR_SOURCE_ROOT, ca)) 219 if self.arguments.has_key('ssl-bulk-cipher'): 220 for bulk_cipher in self.arguments['ssl-bulk-cipher']: 221 self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher) 222 223 def _CloseUnnecessaryFDsForTestServerProcess(self): 224 # This is required to avoid subtle deadlocks that could be caused by the 225 # test server child process inheriting undesirable file descriptors such as 226 # file lock file descriptors. 227 for fd in xrange(0, 1024): 228 if fd != self.pipe_out: 229 try: 230 os.close(fd) 231 except: 232 pass 233 234 def run(self): 235 logging.info('Start running the thread!') 236 self.wait_event.clear() 237 self._GenerateCommandLineArguments() 238 command = constants.DIR_SOURCE_ROOT 239 if self.arguments['server-type'] == 'sync': 240 command = [os.path.join(command, 'sync', 'tools', 'testserver', 241 'sync_testserver.py')] + self.command_line 242 else: 243 command = [os.path.join(command, 'net', 'tools', 'testserver', 244 'testserver.py')] + self.command_line 245 logging.info('Running: %s', command) 246 self.process = subprocess.Popen( 247 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess) 248 if self.process: 249 if self.pipe_out: 250 self.is_ready = self._WaitToStartAndGetPortFromTestServer() 251 else: 252 self.is_ready = _CheckPortStatus(self.host_port, True) 253 if self.is_ready: 254 Forwarder.Map([(0, self.host_port)], self.adb, self.build_type, self.tool) 255 # Check whether the forwarder is ready on the device. 256 self.is_ready = False 257 device_port = Forwarder.DevicePortForHostPort(self.host_port) 258 if device_port and _CheckDevicePortStatus(self.adb, device_port): 259 self.is_ready = True 260 self.forwarder_device_port = device_port 261 # Wake up the request handler thread. 262 self.ready_event.set() 263 # Keep thread running until Stop() gets called. 264 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint) 265 if self.process.poll() is None: 266 self.process.kill() 267 Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb) 268 self.process = None 269 self.is_ready = False 270 if self.pipe_out: 271 os.close(self.pipe_in) 272 os.close(self.pipe_out) 273 self.pipe_in = None 274 self.pipe_out = None 275 logging.info('Test-server has died.') 276 self.wait_event.set() 277 278 def Stop(self): 279 """Blocks until the loop has finished. 280 281 Note that this must be called in another thread. 282 """ 283 if not self.process: 284 return 285 self.stop_flag = True 286 self.wait_event.wait() 287 288 289 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 290 """A handler used to process http GET/POST request.""" 291 292 def _SendResponse(self, response_code, response_reason, additional_headers, 293 contents): 294 """Generates a response sent to the client from the provided parameters. 295 296 Args: 297 response_code: number of the response status. 298 response_reason: string of reason description of the response. 299 additional_headers: dict of additional headers. Each key is the name of 300 the header, each value is the content of the header. 301 contents: string of the contents we want to send to client. 302 """ 303 self.send_response(response_code, response_reason) 304 self.send_header('Content-Type', 'text/html') 305 # Specify the content-length as without it the http(s) response will not 306 # be completed properly (and the browser keeps expecting data). 307 self.send_header('Content-Length', len(contents)) 308 for header_name in additional_headers: 309 self.send_header(header_name, additional_headers[header_name]) 310 self.end_headers() 311 self.wfile.write(contents) 312 self.wfile.flush() 313 314 def _StartTestServer(self): 315 """Starts the test server thread.""" 316 logging.info('Handling request to spawn a test server.') 317 content_type = self.headers.getheader('content-type') 318 if content_type != 'application/json': 319 raise Exception('Bad content-type for start request.') 320 content_length = self.headers.getheader('content-length') 321 if not content_length: 322 content_length = 0 323 try: 324 content_length = int(content_length) 325 except: 326 raise Exception('Bad content-length for start request.') 327 logging.info(content_length) 328 test_server_argument_json = self.rfile.read(content_length) 329 logging.info(test_server_argument_json) 330 assert not self.server.test_server_instance 331 ready_event = threading.Event() 332 self.server.test_server_instance = TestServerThread( 333 ready_event, 334 json.loads(test_server_argument_json), 335 self.server.adb, 336 self.server.tool, 337 self.server.build_type) 338 self.server.test_server_instance.setDaemon(True) 339 self.server.test_server_instance.start() 340 ready_event.wait() 341 if self.server.test_server_instance.is_ready: 342 self._SendResponse(200, 'OK', {}, json.dumps( 343 {'port': self.server.test_server_instance.forwarder_device_port, 344 'message': 'started'})) 345 logging.info('Test server is running on port: %d.', 346 self.server.test_server_instance.host_port) 347 else: 348 self.server.test_server_instance.Stop() 349 self.server.test_server_instance = None 350 self._SendResponse(500, 'Test Server Error.', {}, '') 351 logging.info('Encounter problem during starting a test server.') 352 353 def _KillTestServer(self): 354 """Stops the test server instance.""" 355 # There should only ever be one test server at a time. This may do the 356 # wrong thing if we try and start multiple test servers. 357 if not self.server.test_server_instance: 358 return 359 port = self.server.test_server_instance.host_port 360 logging.info('Handling request to kill a test server on port: %d.', port) 361 self.server.test_server_instance.Stop() 362 # Make sure the status of test server is correct before sending response. 363 if _CheckPortStatus(port, False): 364 self._SendResponse(200, 'OK', {}, 'killed') 365 logging.info('Test server on port %d is killed', port) 366 else: 367 self._SendResponse(500, 'Test Server Error.', {}, '') 368 logging.info('Encounter problem during killing a test server.') 369 self.server.test_server_instance = None 370 371 def do_POST(self): 372 parsed_path = urlparse.urlparse(self.path) 373 action = parsed_path.path 374 logging.info('Action for POST method is: %s.', action) 375 if action == '/start': 376 self._StartTestServer() 377 else: 378 self._SendResponse(400, 'Unknown request.', {}, '') 379 logging.info('Encounter unknown request: %s.', action) 380 381 def do_GET(self): 382 parsed_path = urlparse.urlparse(self.path) 383 action = parsed_path.path 384 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) 385 logging.info('Action for GET method is: %s.', action) 386 for param in params: 387 logging.info('%s=%s', param, params[param][0]) 388 if action == '/kill': 389 self._KillTestServer() 390 elif action == '/ping': 391 # The ping handler is used to check whether the spawner server is ready 392 # to serve the requests. We don't need to test the status of the test 393 # server when handling ping request. 394 self._SendResponse(200, 'OK', {}, 'ready') 395 logging.info('Handled ping request and sent response.') 396 else: 397 self._SendResponse(400, 'Unknown request', {}, '') 398 logging.info('Encounter unknown request: %s.', action) 399 400 401 class SpawningServer(object): 402 """The class used to start/stop a http server.""" 403 404 def __init__(self, test_server_spawner_port, adb, tool, build_type): 405 logging.info('Creating new spawner on port: %d.', test_server_spawner_port) 406 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), 407 SpawningServerRequestHandler) 408 self.server.adb = adb 409 self.server.tool = tool 410 self.server.test_server_instance = None 411 self.server.build_type = build_type 412 413 def _Listen(self): 414 logging.info('Starting test server spawner') 415 self.server.serve_forever() 416 417 def Start(self): 418 """Starts the test server spawner.""" 419 listener_thread = threading.Thread(target=self._Listen) 420 listener_thread.setDaemon(True) 421 listener_thread.start() 422 423 def Stop(self): 424 """Stops the test server spawner. 425 426 Also cleans the server state. 427 """ 428 self.CleanupState() 429 self.server.shutdown() 430 431 def CleanupState(self): 432 """Cleans up the spawning server state. 433 434 This should be called if the test server spawner is reused, 435 to avoid sharing the test server instance. 436 """ 437 if self.server.test_server_instance: 438 self.server.test_server_instance.Stop() 439 self.server.test_server_instance = None 440