1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """A "Test Server Spawner" that handles killing/stopping per-test test servers. 6 7 It's used to accept requests from the device to spawn and kill instances of the 8 chrome test server on the host. 9 """ 10 # pylint: disable=W0702 11 12 import BaseHTTPServer 13 import json 14 import logging 15 import os 16 import select 17 import struct 18 import subprocess 19 import sys 20 import threading 21 import time 22 import urlparse 23 24 from pylib import constants 25 from pylib import ports 26 27 from pylib.forwarder import Forwarder 28 29 30 # Path that are needed to import necessary modules when launching a testserver. 31 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s' 32 % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'), 33 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'), 34 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib', 35 'src'), 36 os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'), 37 os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver'))) 38 39 40 SERVER_TYPES = { 41 'http': '', 42 'ftp': '-f', 43 'sync': '', # Sync uses its own script, and doesn't take a server type arg. 44 'tcpecho': '--tcp-echo', 45 'udpecho': '--udp-echo', 46 } 47 48 49 # The timeout (in seconds) of starting up the Python test server. 50 TEST_SERVER_STARTUP_TIMEOUT = 10 51 52 def _WaitUntil(predicate, max_attempts=5): 53 """Blocks until the provided predicate (function) is true. 54 55 Returns: 56 Whether the provided predicate was satisfied once (before the timeout). 57 """ 58 sleep_time_sec = 0.025 59 for _ in xrange(1, max_attempts): 60 if predicate(): 61 return True 62 time.sleep(sleep_time_sec) 63 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec. 64 return False 65 66 67 def _CheckPortStatus(port, expected_status): 68 """Returns True if port has expected_status. 69 70 Args: 71 port: the port number. 72 expected_status: boolean of expected status. 73 74 Returns: 75 Returns True if the status is expected. Otherwise returns False. 76 """ 77 return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status) 78 79 80 def _CheckDevicePortStatus(device, port): 81 """Returns whether the provided port is used.""" 82 return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port)) 83 84 85 def _GetServerTypeCommandLine(server_type): 86 """Returns the command-line by the given server type. 87 88 Args: 89 server_type: the server type to be used (e.g. 'http'). 90 91 Returns: 92 A string containing the command-line argument. 93 """ 94 if server_type not in SERVER_TYPES: 95 raise NotImplementedError('Unknown server type: %s' % server_type) 96 if server_type == 'udpecho': 97 raise Exception('Please do not run UDP echo tests because we do not have ' 98 'a UDP forwarder tool.') 99 return SERVER_TYPES[server_type] 100 101 102 class TestServerThread(threading.Thread): 103 """A thread to run the test server in a separate process.""" 104 105 def __init__(self, ready_event, arguments, device, tool): 106 """Initialize TestServerThread with the following argument. 107 108 Args: 109 ready_event: event which will be set when the test server is ready. 110 arguments: dictionary of arguments to run the test server. 111 device: An instance of DeviceUtils. 112 tool: instance of runtime error detection tool. 113 """ 114 threading.Thread.__init__(self) 115 self.wait_event = threading.Event() 116 self.stop_flag = False 117 self.ready_event = ready_event 118 self.ready_event.clear() 119 self.arguments = arguments 120 self.device = device 121 self.tool = tool 122 self.test_server_process = None 123 self.is_ready = False 124 self.host_port = self.arguments['port'] 125 assert isinstance(self.host_port, int) 126 # The forwarder device port now is dynamically allocated. 127 self.forwarder_device_port = 0 128 # Anonymous pipe in order to get port info from test server. 129 self.pipe_in = None 130 self.pipe_out = None 131 self.process = None 132 self.command_line = [] 133 134 def _WaitToStartAndGetPortFromTestServer(self): 135 """Waits for the Python test server to start and gets the port it is using. 136 137 The port information is passed by the Python test server with a pipe given 138 by self.pipe_out. It is written as a result to |self.host_port|. 139 140 Returns: 141 Whether the port used by the test server was successfully fetched. 142 """ 143 assert self.host_port == 0 and self.pipe_out and self.pipe_in 144 (in_fds, _, _) = select.select([self.pipe_in, ], [], [], 145 TEST_SERVER_STARTUP_TIMEOUT) 146 if len(in_fds) == 0: 147 logging.error('Failed to wait to the Python test server to be started.') 148 return False 149 # First read the data length as an unsigned 4-byte value. This 150 # is _not_ using network byte ordering since the Python test server packs 151 # size as native byte order and all Chromium platforms so far are 152 # configured to use little-endian. 153 # TODO(jnd): Change the Python test server and local_test_server_*.cc to 154 # use a unified byte order (either big-endian or little-endian). 155 data_length = os.read(self.pipe_in, struct.calcsize('=L')) 156 if data_length: 157 (data_length,) = struct.unpack('=L', data_length) 158 assert data_length 159 if not data_length: 160 logging.error('Failed to get length of server data.') 161 return False 162 port_json = os.read(self.pipe_in, data_length) 163 if not port_json: 164 logging.error('Failed to get server data.') 165 return False 166 logging.info('Got port json data: %s', port_json) 167 port_json = json.loads(port_json) 168 if port_json.has_key('port') and isinstance(port_json['port'], int): 169 self.host_port = port_json['port'] 170 return _CheckPortStatus(self.host_port, True) 171 logging.error('Failed to get port information from the server data.') 172 return False 173 174 def _GenerateCommandLineArguments(self): 175 """Generates the command line to run the test server. 176 177 Note that all options are processed by following the definitions in 178 testserver.py. 179 """ 180 if self.command_line: 181 return 182 183 args_copy = dict(self.arguments) 184 185 # Translate the server type. 186 type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type')) 187 if type_cmd: 188 self.command_line.append(type_cmd) 189 190 # Use a pipe to get the port given by the instance of Python test server 191 # if the test does not specify the port. 192 assert self.host_port == args_copy['port'] 193 if self.host_port == 0: 194 (self.pipe_in, self.pipe_out) = os.pipe() 195 self.command_line.append('--startup-pipe=%d' % self.pipe_out) 196 197 # Pass the remaining arguments as-is. 198 for key, values in args_copy.iteritems(): 199 if not isinstance(values, list): 200 values = [values] 201 for value in values: 202 if value is None: 203 self.command_line.append('--%s' % key) 204 else: 205 self.command_line.append('--%s=%s' % (key, value)) 206 207 def _CloseUnnecessaryFDsForTestServerProcess(self): 208 # This is required to avoid subtle deadlocks that could be caused by the 209 # test server child process inheriting undesirable file descriptors such as 210 # file lock file descriptors. 211 for fd in xrange(0, 1024): 212 if fd != self.pipe_out: 213 try: 214 os.close(fd) 215 except: 216 pass 217 218 def run(self): 219 logging.info('Start running the thread!') 220 self.wait_event.clear() 221 self._GenerateCommandLineArguments() 222 command = constants.DIR_SOURCE_ROOT 223 if self.arguments['server-type'] == 'sync': 224 command = [os.path.join(command, 'sync', 'tools', 'testserver', 225 'sync_testserver.py')] + self.command_line 226 else: 227 command = [os.path.join(command, 'net', 'tools', 'testserver', 228 'testserver.py')] + self.command_line 229 logging.info('Running: %s', command) 230 # Pass DIR_SOURCE_ROOT as the child's working directory so that relative 231 # paths in the arguments are resolved correctly. 232 self.process = subprocess.Popen( 233 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess, 234 cwd=constants.DIR_SOURCE_ROOT) 235 if self.process: 236 if self.pipe_out: 237 self.is_ready = self._WaitToStartAndGetPortFromTestServer() 238 else: 239 self.is_ready = _CheckPortStatus(self.host_port, True) 240 if self.is_ready: 241 Forwarder.Map([(0, self.host_port)], self.device, self.tool) 242 # Check whether the forwarder is ready on the device. 243 self.is_ready = False 244 device_port = Forwarder.DevicePortForHostPort(self.host_port) 245 if device_port and _CheckDevicePortStatus(self.device, device_port): 246 self.is_ready = True 247 self.forwarder_device_port = device_port 248 # Wake up the request handler thread. 249 self.ready_event.set() 250 # Keep thread running until Stop() gets called. 251 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint) 252 if self.process.poll() is None: 253 self.process.kill() 254 Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device) 255 self.process = None 256 self.is_ready = False 257 if self.pipe_out: 258 os.close(self.pipe_in) 259 os.close(self.pipe_out) 260 self.pipe_in = None 261 self.pipe_out = None 262 logging.info('Test-server has died.') 263 self.wait_event.set() 264 265 def Stop(self): 266 """Blocks until the loop has finished. 267 268 Note that this must be called in another thread. 269 """ 270 if not self.process: 271 return 272 self.stop_flag = True 273 self.wait_event.wait() 274 275 276 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 277 """A handler used to process http GET/POST request.""" 278 279 def _SendResponse(self, response_code, response_reason, additional_headers, 280 contents): 281 """Generates a response sent to the client from the provided parameters. 282 283 Args: 284 response_code: number of the response status. 285 response_reason: string of reason description of the response. 286 additional_headers: dict of additional headers. Each key is the name of 287 the header, each value is the content of the header. 288 contents: string of the contents we want to send to client. 289 """ 290 self.send_response(response_code, response_reason) 291 self.send_header('Content-Type', 'text/html') 292 # Specify the content-length as without it the http(s) response will not 293 # be completed properly (and the browser keeps expecting data). 294 self.send_header('Content-Length', len(contents)) 295 for header_name in additional_headers: 296 self.send_header(header_name, additional_headers[header_name]) 297 self.end_headers() 298 self.wfile.write(contents) 299 self.wfile.flush() 300 301 def _StartTestServer(self): 302 """Starts the test server thread.""" 303 logging.info('Handling request to spawn a test server.') 304 content_type = self.headers.getheader('content-type') 305 if content_type != 'application/json': 306 raise Exception('Bad content-type for start request.') 307 content_length = self.headers.getheader('content-length') 308 if not content_length: 309 content_length = 0 310 try: 311 content_length = int(content_length) 312 except: 313 raise Exception('Bad content-length for start request.') 314 logging.info(content_length) 315 test_server_argument_json = self.rfile.read(content_length) 316 logging.info(test_server_argument_json) 317 assert not self.server.test_server_instance 318 ready_event = threading.Event() 319 self.server.test_server_instance = TestServerThread( 320 ready_event, 321 json.loads(test_server_argument_json), 322 self.server.device, 323 self.server.tool) 324 self.server.test_server_instance.setDaemon(True) 325 self.server.test_server_instance.start() 326 ready_event.wait() 327 if self.server.test_server_instance.is_ready: 328 self._SendResponse(200, 'OK', {}, json.dumps( 329 {'port': self.server.test_server_instance.forwarder_device_port, 330 'message': 'started'})) 331 logging.info('Test server is running on port: %d.', 332 self.server.test_server_instance.host_port) 333 else: 334 self.server.test_server_instance.Stop() 335 self.server.test_server_instance = None 336 self._SendResponse(500, 'Test Server Error.', {}, '') 337 logging.info('Encounter problem during starting a test server.') 338 339 def _KillTestServer(self): 340 """Stops the test server instance.""" 341 # There should only ever be one test server at a time. This may do the 342 # wrong thing if we try and start multiple test servers. 343 if not self.server.test_server_instance: 344 return 345 port = self.server.test_server_instance.host_port 346 logging.info('Handling request to kill a test server on port: %d.', port) 347 self.server.test_server_instance.Stop() 348 # Make sure the status of test server is correct before sending response. 349 if _CheckPortStatus(port, False): 350 self._SendResponse(200, 'OK', {}, 'killed') 351 logging.info('Test server on port %d is killed', port) 352 else: 353 self._SendResponse(500, 'Test Server Error.', {}, '') 354 logging.info('Encounter problem during killing a test server.') 355 self.server.test_server_instance = None 356 357 def do_POST(self): 358 parsed_path = urlparse.urlparse(self.path) 359 action = parsed_path.path 360 logging.info('Action for POST method is: %s.', action) 361 if action == '/start': 362 self._StartTestServer() 363 else: 364 self._SendResponse(400, 'Unknown request.', {}, '') 365 logging.info('Encounter unknown request: %s.', action) 366 367 def do_GET(self): 368 parsed_path = urlparse.urlparse(self.path) 369 action = parsed_path.path 370 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) 371 logging.info('Action for GET method is: %s.', action) 372 for param in params: 373 logging.info('%s=%s', param, params[param][0]) 374 if action == '/kill': 375 self._KillTestServer() 376 elif action == '/ping': 377 # The ping handler is used to check whether the spawner server is ready 378 # to serve the requests. We don't need to test the status of the test 379 # server when handling ping request. 380 self._SendResponse(200, 'OK', {}, 'ready') 381 logging.info('Handled ping request and sent response.') 382 else: 383 self._SendResponse(400, 'Unknown request', {}, '') 384 logging.info('Encounter unknown request: %s.', action) 385 386 387 class SpawningServer(object): 388 """The class used to start/stop a http server.""" 389 390 def __init__(self, test_server_spawner_port, device, tool): 391 logging.info('Creating new spawner on port: %d.', test_server_spawner_port) 392 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), 393 SpawningServerRequestHandler) 394 self.server.device = device 395 self.server.tool = tool 396 self.server.test_server_instance = None 397 self.server.build_type = constants.GetBuildType() 398 399 def _Listen(self): 400 logging.info('Starting test server spawner') 401 self.server.serve_forever() 402 403 def Start(self): 404 """Starts the test server spawner.""" 405 listener_thread = threading.Thread(target=self._Listen) 406 listener_thread.setDaemon(True) 407 listener_thread.start() 408 409 def Stop(self): 410 """Stops the test server spawner. 411 412 Also cleans the server state. 413 """ 414 self.CleanupState() 415 self.server.shutdown() 416 417 def CleanupState(self): 418 """Cleans up the spawning server state. 419 420 This should be called if the test server spawner is reused, 421 to avoid sharing the test server instance. 422 """ 423 if self.server.test_server_instance: 424 self.server.test_server_instance.Stop() 425 self.server.test_server_instance = None 426