Home | History | Annotate | Download | only in host_controller
      1 #
      2 # Copyright (C) 2017 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 import cmd
     18 import datetime
     19 import imp  # Python v2 compatibility
     20 import multiprocessing
     21 import multiprocessing.pool
     22 import os
     23 import re
     24 import signal
     25 import socket
     26 import sys
     27 import threading
     28 import time
     29 import urlparse
     30 
     31 from host_controller import common
     32 from host_controller.command_processor import command_build
     33 from host_controller.command_processor import command_config
     34 from host_controller.command_processor import command_copy
     35 from host_controller.command_processor import command_device
     36 from host_controller.command_processor import command_exit
     37 from host_controller.command_processor import command_fetch
     38 from host_controller.command_processor import command_flash
     39 from host_controller.command_processor import command_gsispl
     40 from host_controller.command_processor import command_info
     41 from host_controller.command_processor import command_lease
     42 from host_controller.command_processor import command_list
     43 from host_controller.command_processor import command_retry
     44 from host_controller.command_processor import command_request
     45 from host_controller.command_processor import command_test
     46 from host_controller.command_processor import command_upload
     47 from host_controller.build import build_provider_ab
     48 from host_controller.build import build_provider_gcs
     49 from host_controller.build import build_provider_local_fs
     50 from host_controller.build import build_provider_pab
     51 from host_controller.utils.ipc import shared_dict
     52 from host_controller.vti_interface import vti_endpoint_client
     53 
     54 COMMAND_PROCESSORS = [
     55     command_build.CommandBuild,
     56     command_config.CommandConfig,
     57     command_copy.CommandCopy,
     58     command_device.CommandDevice,
     59     command_exit.CommandExit,
     60     command_fetch.CommandFetch,
     61     command_flash.CommandFlash,
     62     command_gsispl.CommandGsispl,
     63     command_info.CommandInfo,
     64     command_lease.CommandLease,
     65     command_list.CommandList,
     66     command_retry.CommandRetry,
     67     command_request.CommandRequest,
     68     command_test.CommandTest,
     69     command_upload.CommandUpload,
     70 ]
     71 
     72 
     73 class NonDaemonizedProcess(multiprocessing.Process):
     74     """Process class which is not daemonized."""
     75 
     76     def _get_daemon(self):
     77         return False
     78 
     79     def _set_daemon(self, value):
     80         pass
     81 
     82     daemon = property(_get_daemon, _set_daemon)
     83 
     84 
     85 class NonDaemonizedPool(multiprocessing.pool.Pool):
     86     """Pool class which is not daemonized."""
     87 
     88     Process = NonDaemonizedProcess
     89 
     90 
     91 def JobMain(vti_address, in_queue, out_queue, device_status):
     92     """Main() for a child process that executes a leased job.
     93 
     94     Currently, lease jobs must use VTI (not TFC).
     95 
     96     Args:
     97         vti_client: VtiEndpointClient needed to create Console.
     98         in_queue: Queue to get new jobs.
     99         out_queue: Queue to put execution results.
    100         device_status: SharedDict, contains device status information.
    101                        shared between processes.
    102     """
    103     if not vti_address:
    104         print("vti address is not set. example : $ run --vti=<url>")
    105         return
    106 
    107     def SigTermHandler(signum, frame):
    108         """Signal handler for exiting pool process explicitly.
    109 
    110         Added to resolve orphaned pool process issue.
    111         """
    112         sys.exit(0)
    113 
    114     signal.signal(signal.SIGTERM, SigTermHandler)
    115 
    116     vti_client = vti_endpoint_client.VtiEndpointClient(vti_address)
    117     console = Console(
    118         vti_client,
    119         None,
    120         build_provider_pab.BuildProviderPAB(),
    121         None,
    122         job_pool=True)
    123     console.device_status = device_status
    124     multiprocessing.util.Finalize(console, console.__exit__, exitpriority=0)
    125 
    126     while True:
    127         command = in_queue.get()
    128         if command == "exit":
    129             break
    130         elif command == "lease":
    131             filepath, kwargs = vti_client.LeaseJob(socket.gethostname(), True)
    132             print("Job %s -> %s" % (os.getpid(), kwargs))
    133             if filepath is not None:
    134                 # TODO: redirect console output and add
    135                 # console command to access them.
    136 
    137                 for serial in kwargs["serial"]:
    138                     console.device_status[serial] = common._DEVICE_STATUS_DICT[
    139                         "use"]
    140                 print_to_console = True
    141                 if not print_to_console:
    142                     sys.stdout = out
    143                     sys.stderr = err
    144 
    145                 ret = console.ProcessConfigurableScript(
    146                     os.path.join(os.getcwd(), "host_controller", "campaigns",
    147                                  filepath), **kwargs)
    148                 if ret:
    149                     job_status = "complete"
    150                 else:
    151                     job_status = "infra-err"
    152 
    153                 vti_client.StopHeartbeat(job_status)
    154                 print("Job execution complete. "
    155                       "Setting job status to {}".format(job_status))
    156 
    157                 if not print_to_console:
    158                     sys.stdout = sys.__stdout__
    159                     sys.stderr = sys.__stderr__
    160 
    161                 for serial in kwargs["serial"]:
    162                     console.device_status[serial] = common._DEVICE_STATUS_DICT[
    163                         "ready"]
    164         else:
    165             print("Unknown job command %s" % command)
    166 
    167 
    168 class Console(cmd.Cmd):
    169     """The console for host controllers.
    170 
    171     Attributes:
    172         command_processors: dict of string:BaseCommandProcessor,
    173                             map between command string and command processors.
    174         device_image_info: dict containing info about device image files.
    175         prompt: The prompt string at the beginning of each command line.
    176         test_result: dict containing info about the last test result.
    177         test_suite_info: dict containing info about test suite package files.
    178         tools_info: dict containing info about custom tool files.
    179         scheduler_thread: dict containing threading.Thread instances(s) that
    180                           update configs regularly.
    181         _build_provider_pab: The BuildProviderPAB used to download artifacts.
    182         _vti_address: string, VTI service URI.
    183         _vti_client: VtiEndpoewrClient, used to upload data to a test
    184                      scheduling infrastructure.
    185         _tfc_client: The TfcClient that the host controllers connect to.
    186         _hosts: A list of HostController objects.
    187         _in_file: The input file object.
    188         _out_file: The output file object.
    189         _serials: A list of string where each string is a device serial.
    190         _device_status: SharedDict, shared with process pool.
    191                         contains status data on each devices.
    192         _job_pool: bool, True if Console is created from job pool process
    193                    context.
    194     """
    195 
    196     def __init__(self,
    197                  vti_endpoint_client,
    198                  tfc,
    199                  pab,
    200                  host_controllers,
    201                  vti_address=None,
    202                  in_file=sys.stdin,
    203                  out_file=sys.stdout,
    204                  job_pool=False):
    205         """Initializes the attributes and the parsers."""
    206         # cmd.Cmd is old-style class.
    207         cmd.Cmd.__init__(self, stdin=in_file, stdout=out_file)
    208         self._build_provider = {}
    209         self._build_provider["pab"] = pab
    210         self._job_pool = job_pool
    211         if not self._job_pool:
    212             self._build_provider[
    213                 "local_fs"] = build_provider_local_fs.BuildProviderLocalFS()
    214             self._build_provider["gcs"] = build_provider_gcs.BuildProviderGCS()
    215             self._build_provider["ab"] = build_provider_ab.BuildProviderAB()
    216         self._vti_endpoint_client = vti_endpoint_client
    217         self._vti_address = vti_address
    218         self._tfc_client = tfc
    219         self._hosts = host_controllers
    220         self._in_file = in_file
    221         self._out_file = out_file
    222         self.prompt = "> "
    223         self.command_processors = {}
    224         self.device_image_info = {}
    225         self.test_result = {}
    226         self.test_suite_info = {}
    227         self.tools_info = {}
    228         self.fetch_info = {}
    229         self.test_results = {}
    230         self._device_status = shared_dict.SharedDict()
    231 
    232         if common._ANDROID_SERIAL in os.environ:
    233             self._serials = [os.environ[common._ANDROID_SERIAL]]
    234         else:
    235             self._serials = []
    236 
    237         self.InitCommandModuleParsers()
    238         self.SetUpCommandProcessors()
    239 
    240     def __exit__(self):
    241         """Finalizes the build provider attributes explicitly when exited."""
    242         for bp in self._build_provider:
    243             self._build_provider[bp].__del__()
    244 
    245     @property
    246     def device_status(self):
    247         """getter for self._device_status"""
    248         return self._device_status
    249 
    250     @device_status.setter
    251     def device_status(self, device_status):
    252         """setter for self._device_status"""
    253         self._device_status = device_status
    254 
    255     def InitCommandModuleParsers(self):
    256         """Init all console command modules"""
    257         for name in dir(self):
    258             if name.startswith('_Init') and name.endswith('Parser'):
    259                 attr_func = getattr(self, name)
    260                 if hasattr(attr_func, '__call__'):
    261                     attr_func()
    262 
    263     def SetUpCommandProcessors(self):
    264         """Sets up all command processors."""
    265         for command_processor in COMMAND_PROCESSORS:
    266             cp = command_processor()
    267             cp._SetUp(self)
    268             do_text = "do_%s" % cp.command
    269             help_text = "help_%s" % cp.command
    270             setattr(self, do_text, cp._Run)
    271             setattr(self, help_text, cp._Help)
    272             self.command_processors[cp.command] = cp
    273 
    274     def TearDown(self):
    275         """Removes all command processors."""
    276         for command_processor in self.command_processors.itervalues():
    277             command_processor._TearDown()
    278         self.command_processors.clear()
    279 
    280     def FormatString(self, format_string):
    281         """Replaces variables with the values in the console's dictionaries.
    282 
    283         Args:
    284             format_string: The string containing variables enclosed in {}.
    285 
    286         Returns:
    287             The formatted string.
    288 
    289         Raises:
    290             KeyError if a variable is not found in the dictionaries or the
    291             value is empty.
    292         """
    293 
    294         def ReplaceVariable(match):
    295             name = match.group(1)
    296             if name in ("build_id", "branch", "target"):
    297                 value = self.fetch_info[name]
    298             elif name in ("result_zip", "suite_plan"):
    299                 value = self.test_result[name]
    300             elif name in ("timestamp"):
    301                 value = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    302             else:
    303                 value = None
    304 
    305             if not value:
    306                 raise KeyError(name)
    307 
    308             return value
    309 
    310         return re.sub("{([^}]+)}", ReplaceVariable, format_string)
    311 
    312     def ProcessScript(self, script_file_path):
    313         """Processes a .py script file.
    314 
    315         A script file implements a function which emits a list of console
    316         commands to execute. That function emits an empty list or None if
    317         no more command needs to be processed.
    318 
    319         Args:
    320             script_file_path: string, the path of a script file (.py file).
    321 
    322         Returns:
    323             True if successful; False otherwise
    324         """
    325         if not script_file_path.endswith(".py"):
    326             print("Script file is not .py file: %s" % script_file_path)
    327             return False
    328 
    329         script_module = imp.load_source('script_module', script_file_path)
    330 
    331         commands = script_module.EmitConsoleCommands()
    332         if commands:
    333             for command in commands:
    334                 ret = self.onecmd(command)
    335                 if ret == False:
    336                     return False
    337         return True
    338 
    339     def ProcessConfigurableScript(self, script_file_path, **kwargs):
    340         """Processes a .py script file.
    341 
    342         A script file implements a function which emits a list of console
    343         commands to execute. That function emits an empty list or None if
    344         no more command needs to be processed.
    345 
    346         Args:
    347             script_file_path: string, the path of a script file (.py file).
    348             kwargs: extra args for the interface function defined in
    349                     the script file.
    350 
    351         Returns:
    352             True if successful; False otherwise
    353         """
    354         if script_file_path and "." not in script_file_path:
    355             script_file_path += ".py"
    356 
    357         if not script_file_path.endswith(".py"):
    358             print("Script file is not .py file: %s" % script_file_path)
    359             return False
    360 
    361         script_module = imp.load_source('script_module', script_file_path)
    362 
    363         commands = script_module.EmitConsoleCommands(**kwargs)
    364         if commands:
    365             for command in commands:
    366                 ret = self.onecmd(command)
    367                 if ret == False:
    368                     return False
    369         else:
    370             return False
    371         return True
    372 
    373     def _Print(self, string):
    374         """Prints a string and a new line character.
    375 
    376         Args:
    377             string: The string to be printed.
    378         """
    379         self._out_file.write(string + "\n")
    380 
    381     def _PrintObjects(self, objects, attr_names):
    382         """Shows objects as a table.
    383 
    384         Args:
    385             object: The objects to be shown, one object in a row.
    386             attr_names: The attributes to be shown, one attribute in a column.
    387         """
    388         width = [len(name) for name in attr_names]
    389         rows = [attr_names]
    390         for dev_info in objects:
    391             attrs = [
    392                 _ToPrintString(getattr(dev_info, name, ""))
    393                 for name in attr_names
    394             ]
    395             rows.append(attrs)
    396             for index, attr in enumerate(attrs):
    397                 width[index] = max(width[index], len(attr))
    398 
    399         for row in rows:
    400             self._Print("  ".join(
    401                 attr.ljust(width[index]) for index, attr in enumerate(row)))
    402 
    403     def DownloadTestResources(self, request_id):
    404         """Download all of the test resources for a TFC request id.
    405 
    406         Args:
    407             request_id: int, TFC request id
    408         """
    409         resources = self._tfc_client.TestResourceList(request_id)
    410         for resource in resources:
    411             self.DownloadTestResource(resource['url'])
    412 
    413     def DownloadTestResource(self, url):
    414         """Download a test resource with build provider, given a url.
    415 
    416         Args:
    417             url: a resource locator (not necessarily HTTP[s])
    418                 with the scheme specifying the build provider.
    419         """
    420         parsed = urlparse.urlparse(url)
    421         path = (parsed.netloc + parsed.path).split('/')
    422         if parsed.scheme == "pab":
    423             if len(path) != 5:
    424                 print("Invalid pab resource locator: %s" % url)
    425                 return
    426             account_id, branch, target, build_id, artifact_name = path
    427             cmd = ("fetch"
    428                    " --type=pab"
    429                    " --account_id=%s"
    430                    " --branch=%s"
    431                    " --target=%s"
    432                    " --build_id=%s"
    433                    " --artifact_name=%s") % (account_id, branch, target,
    434                                              build_id, artifact_name)
    435             self.onecmd(cmd)
    436         elif parsed.scheme == "ab":
    437             if len(path) != 4:
    438                 print("Invalid ab resource locator: %s" % url)
    439                 return
    440             branch, target, build_id, artifact_name = path
    441             cmd = ("fetch"
    442                    "--type=ab"
    443                    " --branch=%s"
    444                    " --target=%s"
    445                    " --build_id=%s"
    446                    " --artifact_name=%s") % (branch, target, build_id,
    447                                              artifact_name)
    448             self.onecmd(cmd)
    449         elif parsed.scheme == gcs:
    450             cmd = "fetch --type=gcs --path=%s" % url
    451             self.onecmd(cmd)
    452         else:
    453             print "Invalid URL: %s" % url
    454 
    455     def SetSerials(self, serials):
    456         """Sets the default serial numbers for flashing and testing.
    457 
    458         Args:
    459             serials: A list of strings, the serial numbers.
    460         """
    461         self._serials = serials
    462 
    463     def GetSerials(self):
    464         """Returns the serial numbers saved in the console.
    465 
    466         Returns:
    467             A list of strings, the serial numbers.
    468         """
    469         return self._serials
    470 
    471     def JobThread(self):
    472         """Job thread which monitors and uploads results."""
    473         thread = threading.currentThread()
    474         while getattr(thread, "keep_running", True):
    475             time.sleep(1)
    476 
    477         if self._job_pool:
    478             self._job_pool.close()
    479             self._job_pool.terminate()
    480             self._job_pool.join()
    481 
    482     def StartJobThreadAndProcessPool(self):
    483         """Starts a background thread to control leased jobs."""
    484         self._job_in_queue = multiprocessing.Queue()
    485         self._job_out_queue = multiprocessing.Queue()
    486         self._job_pool = NonDaemonizedPool(
    487             common._MAX_LEASED_JOBS, JobMain,
    488             (self._vti_address, self._job_in_queue, self._job_out_queue,
    489              self._device_status))
    490 
    491         self._job_thread = threading.Thread(target=self.JobThread)
    492         self._job_thread.daemon = True
    493         self._job_thread.start()
    494 
    495     def StopJobThreadAndProcessPool(self):
    496         """Terminates the thread and processes that runs the leased job."""
    497         if hasattr(self, "_job_thread"):
    498             self._job_thread.keep_running = False
    499             self._job_thread.join()
    500 
    501     # @Override
    502     def onecmd(self, line, depth=1, ret_out_queue=None):
    503         """Executes command(s) and prints any exception.
    504 
    505         Parallel execution only for 2nd-level list element.
    506 
    507         Args:
    508             line: a list of string or string which keeps the command to run.
    509         """
    510         if not line:
    511             return
    512 
    513         if type(line) == list:
    514             if depth == 1:  # 1 to use multi-threading
    515                 jobs = []
    516                 ret_queue = multiprocessing.Queue()
    517                 for sub_command in line:
    518                     p = multiprocessing.Process(
    519                         target=self.onecmd,
    520                         args=(
    521                             sub_command,
    522                             depth + 1,
    523                             ret_queue,
    524                         ))
    525                     jobs.append(p)
    526                     p.start()
    527                 for job in jobs:
    528                     job.join()
    529 
    530                 ret_cmd_list = True
    531                 while not ret_queue.empty():
    532                     ret_from_subprocess = ret_queue.get()
    533                     ret_cmd_list = ret_cmd_list and ret_from_subprocess
    534                 if ret_cmd_list == False:
    535                     return False
    536             else:
    537                 for sub_command in line:
    538                     ret_cmd_list = self.onecmd(sub_command, depth + 1)
    539                     if ret_cmd_list == False and ret_out_queue:
    540                         ret_out_queue.put(False)
    541                         return False
    542             return
    543 
    544         print("Command: %s" % line)
    545         try:
    546             ret_cmd = cmd.Cmd.onecmd(self, line)
    547             if ret_cmd == False and ret_out_queue:
    548                 ret_out_queue.put(ret_cmd)
    549             return ret_cmd
    550         except Exception as e:
    551             self._Print("%s: %s" % (type(e).__name__, e))
    552             if ret_out_queue:
    553                 ret_out_queue.put(False)
    554             return False
    555 
    556     # @Override
    557     def emptyline(self):
    558         """Ignores empty lines."""
    559         pass
    560 
    561     # @Override
    562     def default(self, line):
    563         """Handles unrecognized commands.
    564 
    565         Returns:
    566             True if receives EOF; otherwise delegates to default handler.
    567         """
    568         if line == "EOF":
    569             return self.do_exit(line)
    570         return cmd.Cmd.default(self, line)
    571 
    572 
    573 def _ToPrintString(obj):
    574     """Converts an object to printable string on console.
    575 
    576     Args:
    577         obj: The object to be printed.
    578     """
    579     if isinstance(obj, (list, tuple, set)):
    580         return ",".join(str(x) for x in obj)
    581     return str(obj)
    582