Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/env python
      2 #
      3 # Copyright (C) 2017 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #      http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 #
     17 
     18 import datetime
     19 import logging
     20 import webapp2
     21 
     22 from webapp.src import vtslab_status as Status
     23 from webapp.src.proto import model
     24 from webapp.src.utils import logger
     25 
     26 
     27 def StrGT(left, right):
     28     """Returns true if `left` string is greater than `right` in value."""
     29     if len(left) > len(right):
     30         right = "0" * (len(left) - len(right)) + right
     31     elif len(right) > len(left):
     32         left = "0" * (len(right) - len(left)) + left
     33     return left > right
     34 
     35 
     36 class PeriodicScheduler(webapp2.RequestHandler):
     37     """Main class for /tasks/schedule servlet.
     38 
     39     This class creates jobs from registered schedules periodically.
     40 
     41     Attributes:
     42         logger: Logger class
     43     """
     44     logger = logger.Logger()
     45 
     46     def ReserveDevices(self, target_device_serials):
     47         """Reserves devices.
     48 
     49         Args:
     50             target_device_serials: a list of strings, containing target device
     51                                    serial numbers.
     52         """
     53         device_query = model.DeviceModel.query(
     54             model.DeviceModel.serial.IN(target_device_serials))
     55         devices = device_query.fetch()
     56         for device in devices:
     57             device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
     58                 "reserved"]
     59             device.put()
     60 
     61     def FindBuildId(self, new_job):
     62         """Finds build ID for a new job.
     63 
     64         Args:
     65             new_job: JobModel, a new job.
     66 
     67         Return:
     68             string, build ID found.
     69         """
     70         build_id = ""
     71         build_query = model.BuildModel.query(
     72             model.BuildModel.manifest_branch == new_job.manifest_branch)
     73         builds = build_query.fetch()
     74 
     75         if builds:
     76             self.logger.Println("-- Find build ID")
     77             # Remove builds if build_id info is none
     78             build_id_filled = [x for x in builds if x.build_id]
     79             sorted_list = sorted(
     80                 build_id_filled, key=lambda x: int(x.build_id), reverse=True)
     81             filtered_list = [
     82                 x for x in sorted_list
     83                 if (all(
     84                     hasattr(x, attrs)
     85                     for attrs in ["build_target", "build_type", "build_id"])
     86                     and x.build_target and x.build_type)
     87             ]
     88             for device_build in filtered_list:
     89                 candidate_build_target = "-".join(
     90                     [device_build.build_target, device_build.build_type])
     91                 if (new_job.build_target == candidate_build_target and
     92                     (not new_job.require_signed_device_build or
     93                      device_build.signed)):
     94                     build_id = device_build.build_id
     95                     break
     96         return build_id
     97 
     98     def get(self):
     99         """Generates an HTML page based on the task schedules kept in DB."""
    100         self.logger.Clear()
    101 
    102         schedule_query = model.ScheduleModel.query()
    103         schedules = schedule_query.fetch()
    104 
    105         if schedules:
    106             for schedule in schedules:
    107                 self.logger.Println("Schedule: %s (%s %s)" %
    108                                     (schedule.test_name,
    109                                      schedule.manifest_branch,
    110                                      schedule.build_target))
    111                 self.logger.Indent()
    112                 if self.NewPeriod(schedule):
    113                     self.logger.Println("- Need new job")
    114                     target_host, target_device, target_device_serials =\
    115                         self.SelectTargetLab(schedule)
    116                     self.logger.Println("- Target host: %s" % target_host)
    117                     self.logger.Println("- Target device: %s" % target_device)
    118                     self.logger.Println(
    119                         "- Target serials: %s" % target_device_serials)
    120                     # TODO: update device status
    121 
    122                     # create job and add.
    123                     if target_host:
    124                         new_job = model.JobModel()
    125                         new_job.hostname = target_host
    126                         new_job.priority = schedule.priority
    127                         new_job.test_name = schedule.test_name
    128                         new_job.require_signed_device_build = (
    129                             schedule.require_signed_device_build)
    130                         new_job.device = target_device
    131                         new_job.period = schedule.period
    132                         new_job.serial.extend(target_device_serials)
    133                         new_job.build_storage_type = schedule.build_storage_type
    134                         new_job.manifest_branch = schedule.manifest_branch
    135                         new_job.build_target = schedule.build_target
    136                         new_job.shards = schedule.shards
    137                         new_job.param = schedule.param
    138                         new_job.retry_count = schedule.retry_count
    139                         new_job.gsi_storage_type = schedule.gsi_storage_type
    140                         new_job.gsi_branch = schedule.gsi_branch
    141                         new_job.gsi_build_target = schedule.gsi_build_target
    142                         new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
    143                         new_job.test_storage_type = schedule.test_storage_type
    144                         new_job.test_branch = schedule.test_branch
    145                         new_job.test_build_target = schedule.test_build_target
    146                         new_job.test_pab_account_id = (
    147                             schedule.test_pab_account_id)
    148 
    149                         new_job.build_id = ""
    150 
    151                         if new_job.build_storage_type == (
    152                                 Status.STORAGE_TYPE_DICT["PAB"]):
    153                             new_job.build_id = self.FindBuildId(new_job)
    154                             if new_job.build_id:
    155                                 self.ReserveDevices(target_device_serials)
    156                                 new_job.status = Status.JOB_STATUS_DICT[
    157                                     "ready"]
    158                                 new_job.timestamp = datetime.datetime.now()
    159                                 new_job.put()
    160                                 self.logger.Println("NEW JOB")
    161                             else:
    162                                 self.logger.Println("NO BUILD FOUND")
    163                         elif new_job.build_storage_type == (
    164                                 Status.STORAGE_TYPE_DICT["GCS"]):
    165                             new_job.status = Status.JOB_STATUS_DICT["ready"]
    166                             new_job.timestamp = datetime.datetime.now()
    167                             new_job.put()
    168                             self.logger.Println("NEW JOB - GCS")
    169                         else:
    170                             self.logger.Println("Unexpected storage type.")
    171 
    172                 self.logger.Unindent()
    173 
    174         self.response.write(
    175             "<pre>\n" + "\n".join(self.logger.Get()) + "\n</pre>")
    176 
    177     def NewPeriod(self, schedule):
    178         """Checks whether a new job creation is needed.
    179 
    180         Args:
    181             schedule: a proto containing schedule information.
    182 
    183         Returns:
    184             True if new job is required, False otherwise.
    185         """
    186         job_query = model.JobModel.query(
    187             model.JobModel.manifest_branch == schedule.manifest_branch,
    188             model.JobModel.build_target == schedule.build_target,
    189             model.JobModel.test_name == schedule.test_name,
    190             model.JobModel.period == schedule.period,
    191             model.JobModel.shards == schedule.shards,
    192             model.JobModel.retry_count == schedule.retry_count,
    193             model.JobModel.gsi_branch == schedule.gsi_branch,
    194             model.JobModel.test_branch == schedule.test_branch)
    195         same_jobs = job_query.fetch()
    196         same_jobs = [
    197             x for x in same_jobs
    198             if (set(x.param) == set(schedule.param)
    199                 and x.device in schedule.device)
    200         ]
    201         if not same_jobs:
    202             return True
    203 
    204         outdated_jobs = [
    205             x for x in same_jobs
    206             if (datetime.datetime.now() - x.timestamp > datetime.timedelta(
    207                 minutes=x.period))
    208         ]
    209         outdated_ready_jobs = [
    210             x for x in outdated_jobs
    211             if x.status == Status.JOB_STATUS_DICT["expired"]
    212         ]
    213 
    214         if outdated_ready_jobs:
    215             msg = ("Job key[{}] is(are) outdated. "
    216                    "They became infra-err status.").format(
    217                        ", ".join(
    218                            [str(x.key.id()) for x in outdated_ready_jobs]))
    219             logging.debug(msg)
    220             self.logger.Println(msg)
    221             for job in outdated_ready_jobs:
    222                 job.status = Status.JOB_STATUS_DICT["infra-err"]
    223                 job.put()
    224 
    225         outdated_leased_jobs = [
    226             x for x in outdated_jobs
    227             if x.status == Status.JOB_STATUS_DICT["leased"]
    228         ]
    229         if outdated_leased_jobs:
    230             msg = ("Job key[{}] is(are) expected to be completed "
    231                    "however still in leased status.").format(
    232                        ", ".join(
    233                            [str(x.key.id()) for x in outdated_leased_jobs]))
    234             logging.debug(msg)
    235             self.logger.Println(msg)
    236 
    237         recent_jobs = [x for x in same_jobs if x not in outdated_jobs]
    238 
    239         if recent_jobs or outdated_leased_jobs:
    240             return False
    241         else:
    242             return True
    243 
    244     def SelectTargetLab(self, schedule):
    245         """Find target host and devices to schedule a new job.
    246 
    247         Args:
    248             schedule: a proto containing the information of a schedule.
    249 
    250         Returns:
    251             a string which represents hostname,
    252             a string containing target lab and product with '/' separator,
    253             a list of selected devices serial (see whether devices will be
    254             selected later when the job is picked up.)
    255         """
    256         for target_device in schedule.device:
    257             if "/" not in target_device:
    258                 # device malformed
    259                 continue
    260 
    261             target_lab, target_product_type = target_device.split("/")
    262             self.logger.Println("- Seeking product %s in lab %s" %
    263                                 (target_product_type, target_lab))
    264             self.logger.Indent()
    265             lab_query = model.LabModel.query(model.LabModel.name == target_lab)
    266             target_labs = lab_query.fetch()
    267 
    268             available_devices = {}
    269             if target_labs:
    270                 for lab in target_labs:
    271                     self.logger.Println("- target lab found")
    272                     self.logger.Println("- target device %s %s" %
    273                                         (lab.hostname, target_product_type))
    274                     self.logger.Indent()
    275                     device_query = model.DeviceModel.query(
    276                         model.DeviceModel.hostname == lab.hostname)
    277                     host_devices = device_query.fetch()
    278 
    279                     for device in host_devices:
    280                         self.logger.Println("- check device %s %s" %
    281                                             (device.status, device.product))
    282                         if ((device.status in [
    283                                 Status.DEVICE_STATUS_DICT["fastboot"],
    284                                 Status.DEVICE_STATUS_DICT["online"],
    285                                 Status.DEVICE_STATUS_DICT["ready"]
    286                         ]) and (device.scheduling_status ==
    287                                 Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
    288                                 and device.product == target_product_type):
    289                             self.logger.Println(
    290                                 "- a device found %s" % device.serial)
    291                             if device.hostname not in available_devices:
    292                                 available_devices[device.hostname] = set()
    293                             available_devices[device.hostname].add(
    294                                 device.serial)
    295                     self.logger.Unindent()
    296                 for host in available_devices:
    297                     self.logger.Println("- len(devices) %s >= shards %s ?" %
    298                                         (len(available_devices[host]),
    299                                          schedule.shards))
    300                     if len(available_devices[host]) >= schedule.shards:
    301                         self.logger.Unindent()
    302                         return host, target_device, list(
    303                             available_devices[host])[:schedule.shards]
    304             self.logger.Unindent()
    305         return None, None, []
    306