1 # 2 # Copyright (C) 2017 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 import cmd 18 import datetime 19 import imp # Python v2 compatibility 20 import multiprocessing 21 import multiprocessing.pool 22 import os 23 import re 24 import signal 25 import socket 26 import sys 27 import threading 28 import time 29 import urlparse 30 31 from host_controller import common 32 from host_controller.command_processor import command_build 33 from host_controller.command_processor import command_config 34 from host_controller.command_processor import command_copy 35 from host_controller.command_processor import command_device 36 from host_controller.command_processor import command_exit 37 from host_controller.command_processor import command_fetch 38 from host_controller.command_processor import command_flash 39 from host_controller.command_processor import command_gsispl 40 from host_controller.command_processor import command_info 41 from host_controller.command_processor import command_lease 42 from host_controller.command_processor import command_list 43 from host_controller.command_processor import command_retry 44 from host_controller.command_processor import command_request 45 from host_controller.command_processor import command_test 46 from host_controller.command_processor import command_upload 47 from host_controller.build import build_provider_ab 48 from host_controller.build import build_provider_gcs 49 from host_controller.build import build_provider_local_fs 50 from host_controller.build import build_provider_pab 51 from host_controller.utils.ipc import shared_dict 52 from host_controller.vti_interface import vti_endpoint_client 53 54 COMMAND_PROCESSORS = [ 55 command_build.CommandBuild, 56 command_config.CommandConfig, 57 command_copy.CommandCopy, 58 command_device.CommandDevice, 59 command_exit.CommandExit, 60 command_fetch.CommandFetch, 61 command_flash.CommandFlash, 62 command_gsispl.CommandGsispl, 63 command_info.CommandInfo, 64 command_lease.CommandLease, 65 command_list.CommandList, 66 command_retry.CommandRetry, 67 command_request.CommandRequest, 68 command_test.CommandTest, 69 command_upload.CommandUpload, 70 ] 71 72 73 class NonDaemonizedProcess(multiprocessing.Process): 74 """Process class which is not daemonized.""" 75 76 def _get_daemon(self): 77 return False 78 79 def _set_daemon(self, value): 80 pass 81 82 daemon = property(_get_daemon, _set_daemon) 83 84 85 class NonDaemonizedPool(multiprocessing.pool.Pool): 86 """Pool class which is not daemonized.""" 87 88 Process = NonDaemonizedProcess 89 90 91 def JobMain(vti_address, in_queue, out_queue, device_status): 92 """Main() for a child process that executes a leased job. 93 94 Currently, lease jobs must use VTI (not TFC). 95 96 Args: 97 vti_client: VtiEndpointClient needed to create Console. 98 in_queue: Queue to get new jobs. 99 out_queue: Queue to put execution results. 100 device_status: SharedDict, contains device status information. 101 shared between processes. 102 """ 103 if not vti_address: 104 print("vti address is not set. example : $ run --vti=<url>") 105 return 106 107 def SigTermHandler(signum, frame): 108 """Signal handler for exiting pool process explicitly. 109 110 Added to resolve orphaned pool process issue. 111 """ 112 sys.exit(0) 113 114 signal.signal(signal.SIGTERM, SigTermHandler) 115 116 vti_client = vti_endpoint_client.VtiEndpointClient(vti_address) 117 console = Console( 118 vti_client, 119 None, 120 build_provider_pab.BuildProviderPAB(), 121 None, 122 job_pool=True) 123 console.device_status = device_status 124 multiprocessing.util.Finalize(console, console.__exit__, exitpriority=0) 125 126 while True: 127 command = in_queue.get() 128 if command == "exit": 129 break 130 elif command == "lease": 131 filepath, kwargs = vti_client.LeaseJob(socket.gethostname(), True) 132 print("Job %s -> %s" % (os.getpid(), kwargs)) 133 if filepath is not None: 134 # TODO: redirect console output and add 135 # console command to access them. 136 137 for serial in kwargs["serial"]: 138 console.device_status[serial] = common._DEVICE_STATUS_DICT[ 139 "use"] 140 print_to_console = True 141 if not print_to_console: 142 sys.stdout = out 143 sys.stderr = err 144 145 ret = console.ProcessConfigurableScript( 146 os.path.join(os.getcwd(), "host_controller", "campaigns", 147 filepath), **kwargs) 148 if ret: 149 job_status = "complete" 150 else: 151 job_status = "infra-err" 152 153 vti_client.StopHeartbeat(job_status) 154 print("Job execution complete. " 155 "Setting job status to {}".format(job_status)) 156 157 if not print_to_console: 158 sys.stdout = sys.__stdout__ 159 sys.stderr = sys.__stderr__ 160 161 for serial in kwargs["serial"]: 162 console.device_status[serial] = common._DEVICE_STATUS_DICT[ 163 "ready"] 164 else: 165 print("Unknown job command %s" % command) 166 167 168 class Console(cmd.Cmd): 169 """The console for host controllers. 170 171 Attributes: 172 command_processors: dict of string:BaseCommandProcessor, 173 map between command string and command processors. 174 device_image_info: dict containing info about device image files. 175 prompt: The prompt string at the beginning of each command line. 176 test_result: dict containing info about the last test result. 177 test_suite_info: dict containing info about test suite package files. 178 tools_info: dict containing info about custom tool files. 179 scheduler_thread: dict containing threading.Thread instances(s) that 180 update configs regularly. 181 _build_provider_pab: The BuildProviderPAB used to download artifacts. 182 _vti_address: string, VTI service URI. 183 _vti_client: VtiEndpoewrClient, used to upload data to a test 184 scheduling infrastructure. 185 _tfc_client: The TfcClient that the host controllers connect to. 186 _hosts: A list of HostController objects. 187 _in_file: The input file object. 188 _out_file: The output file object. 189 _serials: A list of string where each string is a device serial. 190 _device_status: SharedDict, shared with process pool. 191 contains status data on each devices. 192 _job_pool: bool, True if Console is created from job pool process 193 context. 194 """ 195 196 def __init__(self, 197 vti_endpoint_client, 198 tfc, 199 pab, 200 host_controllers, 201 vti_address=None, 202 in_file=sys.stdin, 203 out_file=sys.stdout, 204 job_pool=False): 205 """Initializes the attributes and the parsers.""" 206 # cmd.Cmd is old-style class. 207 cmd.Cmd.__init__(self, stdin=in_file, stdout=out_file) 208 self._build_provider = {} 209 self._build_provider["pab"] = pab 210 self._job_pool = job_pool 211 if not self._job_pool: 212 self._build_provider[ 213 "local_fs"] = build_provider_local_fs.BuildProviderLocalFS() 214 self._build_provider["gcs"] = build_provider_gcs.BuildProviderGCS() 215 self._build_provider["ab"] = build_provider_ab.BuildProviderAB() 216 self._vti_endpoint_client = vti_endpoint_client 217 self._vti_address = vti_address 218 self._tfc_client = tfc 219 self._hosts = host_controllers 220 self._in_file = in_file 221 self._out_file = out_file 222 self.prompt = "> " 223 self.command_processors = {} 224 self.device_image_info = {} 225 self.test_result = {} 226 self.test_suite_info = {} 227 self.tools_info = {} 228 self.fetch_info = {} 229 self.test_results = {} 230 self._device_status = shared_dict.SharedDict() 231 232 if common._ANDROID_SERIAL in os.environ: 233 self._serials = [os.environ[common._ANDROID_SERIAL]] 234 else: 235 self._serials = [] 236 237 self.InitCommandModuleParsers() 238 self.SetUpCommandProcessors() 239 240 def __exit__(self): 241 """Finalizes the build provider attributes explicitly when exited.""" 242 for bp in self._build_provider: 243 self._build_provider[bp].__del__() 244 245 @property 246 def device_status(self): 247 """getter for self._device_status""" 248 return self._device_status 249 250 @device_status.setter 251 def device_status(self, device_status): 252 """setter for self._device_status""" 253 self._device_status = device_status 254 255 def InitCommandModuleParsers(self): 256 """Init all console command modules""" 257 for name in dir(self): 258 if name.startswith('_Init') and name.endswith('Parser'): 259 attr_func = getattr(self, name) 260 if hasattr(attr_func, '__call__'): 261 attr_func() 262 263 def SetUpCommandProcessors(self): 264 """Sets up all command processors.""" 265 for command_processor in COMMAND_PROCESSORS: 266 cp = command_processor() 267 cp._SetUp(self) 268 do_text = "do_%s" % cp.command 269 help_text = "help_%s" % cp.command 270 setattr(self, do_text, cp._Run) 271 setattr(self, help_text, cp._Help) 272 self.command_processors[cp.command] = cp 273 274 def TearDown(self): 275 """Removes all command processors.""" 276 for command_processor in self.command_processors.itervalues(): 277 command_processor._TearDown() 278 self.command_processors.clear() 279 280 def FormatString(self, format_string): 281 """Replaces variables with the values in the console's dictionaries. 282 283 Args: 284 format_string: The string containing variables enclosed in {}. 285 286 Returns: 287 The formatted string. 288 289 Raises: 290 KeyError if a variable is not found in the dictionaries or the 291 value is empty. 292 """ 293 294 def ReplaceVariable(match): 295 name = match.group(1) 296 if name in ("build_id", "branch", "target"): 297 value = self.fetch_info[name] 298 elif name in ("result_zip", "suite_plan"): 299 value = self.test_result[name] 300 elif name in ("timestamp"): 301 value = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") 302 else: 303 value = None 304 305 if not value: 306 raise KeyError(name) 307 308 return value 309 310 return re.sub("{([^}]+)}", ReplaceVariable, format_string) 311 312 def ProcessScript(self, script_file_path): 313 """Processes a .py script file. 314 315 A script file implements a function which emits a list of console 316 commands to execute. That function emits an empty list or None if 317 no more command needs to be processed. 318 319 Args: 320 script_file_path: string, the path of a script file (.py file). 321 322 Returns: 323 True if successful; False otherwise 324 """ 325 if not script_file_path.endswith(".py"): 326 print("Script file is not .py file: %s" % script_file_path) 327 return False 328 329 script_module = imp.load_source('script_module', script_file_path) 330 331 commands = script_module.EmitConsoleCommands() 332 if commands: 333 for command in commands: 334 ret = self.onecmd(command) 335 if ret == False: 336 return False 337 return True 338 339 def ProcessConfigurableScript(self, script_file_path, **kwargs): 340 """Processes a .py script file. 341 342 A script file implements a function which emits a list of console 343 commands to execute. That function emits an empty list or None if 344 no more command needs to be processed. 345 346 Args: 347 script_file_path: string, the path of a script file (.py file). 348 kwargs: extra args for the interface function defined in 349 the script file. 350 351 Returns: 352 True if successful; False otherwise 353 """ 354 if script_file_path and "." not in script_file_path: 355 script_file_path += ".py" 356 357 if not script_file_path.endswith(".py"): 358 print("Script file is not .py file: %s" % script_file_path) 359 return False 360 361 script_module = imp.load_source('script_module', script_file_path) 362 363 commands = script_module.EmitConsoleCommands(**kwargs) 364 if commands: 365 for command in commands: 366 ret = self.onecmd(command) 367 if ret == False: 368 return False 369 else: 370 return False 371 return True 372 373 def _Print(self, string): 374 """Prints a string and a new line character. 375 376 Args: 377 string: The string to be printed. 378 """ 379 self._out_file.write(string + "\n") 380 381 def _PrintObjects(self, objects, attr_names): 382 """Shows objects as a table. 383 384 Args: 385 object: The objects to be shown, one object in a row. 386 attr_names: The attributes to be shown, one attribute in a column. 387 """ 388 width = [len(name) for name in attr_names] 389 rows = [attr_names] 390 for dev_info in objects: 391 attrs = [ 392 _ToPrintString(getattr(dev_info, name, "")) 393 for name in attr_names 394 ] 395 rows.append(attrs) 396 for index, attr in enumerate(attrs): 397 width[index] = max(width[index], len(attr)) 398 399 for row in rows: 400 self._Print(" ".join( 401 attr.ljust(width[index]) for index, attr in enumerate(row))) 402 403 def DownloadTestResources(self, request_id): 404 """Download all of the test resources for a TFC request id. 405 406 Args: 407 request_id: int, TFC request id 408 """ 409 resources = self._tfc_client.TestResourceList(request_id) 410 for resource in resources: 411 self.DownloadTestResource(resource['url']) 412 413 def DownloadTestResource(self, url): 414 """Download a test resource with build provider, given a url. 415 416 Args: 417 url: a resource locator (not necessarily HTTP[s]) 418 with the scheme specifying the build provider. 419 """ 420 parsed = urlparse.urlparse(url) 421 path = (parsed.netloc + parsed.path).split('/') 422 if parsed.scheme == "pab": 423 if len(path) != 5: 424 print("Invalid pab resource locator: %s" % url) 425 return 426 account_id, branch, target, build_id, artifact_name = path 427 cmd = ("fetch" 428 " --type=pab" 429 " --account_id=%s" 430 " --branch=%s" 431 " --target=%s" 432 " --build_id=%s" 433 " --artifact_name=%s") % (account_id, branch, target, 434 build_id, artifact_name) 435 self.onecmd(cmd) 436 elif parsed.scheme == "ab": 437 if len(path) != 4: 438 print("Invalid ab resource locator: %s" % url) 439 return 440 branch, target, build_id, artifact_name = path 441 cmd = ("fetch" 442 "--type=ab" 443 " --branch=%s" 444 " --target=%s" 445 " --build_id=%s" 446 " --artifact_name=%s") % (branch, target, build_id, 447 artifact_name) 448 self.onecmd(cmd) 449 elif parsed.scheme == gcs: 450 cmd = "fetch --type=gcs --path=%s" % url 451 self.onecmd(cmd) 452 else: 453 print "Invalid URL: %s" % url 454 455 def SetSerials(self, serials): 456 """Sets the default serial numbers for flashing and testing. 457 458 Args: 459 serials: A list of strings, the serial numbers. 460 """ 461 self._serials = serials 462 463 def GetSerials(self): 464 """Returns the serial numbers saved in the console. 465 466 Returns: 467 A list of strings, the serial numbers. 468 """ 469 return self._serials 470 471 def JobThread(self): 472 """Job thread which monitors and uploads results.""" 473 thread = threading.currentThread() 474 while getattr(thread, "keep_running", True): 475 time.sleep(1) 476 477 if self._job_pool: 478 self._job_pool.close() 479 self._job_pool.terminate() 480 self._job_pool.join() 481 482 def StartJobThreadAndProcessPool(self): 483 """Starts a background thread to control leased jobs.""" 484 self._job_in_queue = multiprocessing.Queue() 485 self._job_out_queue = multiprocessing.Queue() 486 self._job_pool = NonDaemonizedPool( 487 common._MAX_LEASED_JOBS, JobMain, 488 (self._vti_address, self._job_in_queue, self._job_out_queue, 489 self._device_status)) 490 491 self._job_thread = threading.Thread(target=self.JobThread) 492 self._job_thread.daemon = True 493 self._job_thread.start() 494 495 def StopJobThreadAndProcessPool(self): 496 """Terminates the thread and processes that runs the leased job.""" 497 if hasattr(self, "_job_thread"): 498 self._job_thread.keep_running = False 499 self._job_thread.join() 500 501 # @Override 502 def onecmd(self, line, depth=1, ret_out_queue=None): 503 """Executes command(s) and prints any exception. 504 505 Parallel execution only for 2nd-level list element. 506 507 Args: 508 line: a list of string or string which keeps the command to run. 509 """ 510 if not line: 511 return 512 513 if type(line) == list: 514 if depth == 1: # 1 to use multi-threading 515 jobs = [] 516 ret_queue = multiprocessing.Queue() 517 for sub_command in line: 518 p = multiprocessing.Process( 519 target=self.onecmd, 520 args=( 521 sub_command, 522 depth + 1, 523 ret_queue, 524 )) 525 jobs.append(p) 526 p.start() 527 for job in jobs: 528 job.join() 529 530 ret_cmd_list = True 531 while not ret_queue.empty(): 532 ret_from_subprocess = ret_queue.get() 533 ret_cmd_list = ret_cmd_list and ret_from_subprocess 534 if ret_cmd_list == False: 535 return False 536 else: 537 for sub_command in line: 538 ret_cmd_list = self.onecmd(sub_command, depth + 1) 539 if ret_cmd_list == False and ret_out_queue: 540 ret_out_queue.put(False) 541 return False 542 return 543 544 print("Command: %s" % line) 545 try: 546 ret_cmd = cmd.Cmd.onecmd(self, line) 547 if ret_cmd == False and ret_out_queue: 548 ret_out_queue.put(ret_cmd) 549 return ret_cmd 550 except Exception as e: 551 self._Print("%s: %s" % (type(e).__name__, e)) 552 if ret_out_queue: 553 ret_out_queue.put(False) 554 return False 555 556 # @Override 557 def emptyline(self): 558 """Ignores empty lines.""" 559 pass 560 561 # @Override 562 def default(self, line): 563 """Handles unrecognized commands. 564 565 Returns: 566 True if receives EOF; otherwise delegates to default handler. 567 """ 568 if line == "EOF": 569 return self.do_exit(line) 570 return cmd.Cmd.default(self, line) 571 572 573 def _ToPrintString(obj): 574 """Converts an object to printable string on console. 575 576 Args: 577 obj: The object to be printed. 578 """ 579 if isinstance(obj, (list, tuple, set)): 580 return ",".join(str(x) for x in obj) 581 return str(obj) 582