1 #!/usr/bin/env python 2 # 3 # Copyright (C) 2017 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 import datetime 19 import logging 20 import webapp2 21 22 from webapp.src import vtslab_status as Status 23 from webapp.src.proto import model 24 from webapp.src.utils import logger 25 26 27 def StrGT(left, right): 28 """Returns true if `left` string is greater than `right` in value.""" 29 if len(left) > len(right): 30 right = "0" * (len(left) - len(right)) + right 31 elif len(right) > len(left): 32 left = "0" * (len(right) - len(left)) + left 33 return left > right 34 35 36 class PeriodicScheduler(webapp2.RequestHandler): 37 """Main class for /tasks/schedule servlet. 38 39 This class creates jobs from registered schedules periodically. 40 41 Attributes: 42 logger: Logger class 43 """ 44 logger = logger.Logger() 45 46 def ReserveDevices(self, target_device_serials): 47 """Reserves devices. 48 49 Args: 50 target_device_serials: a list of strings, containing target device 51 serial numbers. 52 """ 53 device_query = model.DeviceModel.query( 54 model.DeviceModel.serial.IN(target_device_serials)) 55 devices = device_query.fetch() 56 for device in devices: 57 device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ 58 "reserved"] 59 device.put() 60 61 def FindBuildId(self, new_job): 62 """Finds build ID for a new job. 63 64 Args: 65 new_job: JobModel, a new job. 66 67 Return: 68 string, build ID found. 69 """ 70 build_id = "" 71 build_query = model.BuildModel.query( 72 model.BuildModel.manifest_branch == new_job.manifest_branch) 73 builds = build_query.fetch() 74 75 if builds: 76 self.logger.Println("-- Find build ID") 77 # Remove builds if build_id info is none 78 build_id_filled = [x for x in builds if x.build_id] 79 sorted_list = sorted( 80 build_id_filled, key=lambda x: int(x.build_id), reverse=True) 81 filtered_list = [ 82 x for x in sorted_list 83 if (all( 84 hasattr(x, attrs) 85 for attrs in ["build_target", "build_type", "build_id"]) 86 and x.build_target and x.build_type) 87 ] 88 for device_build in filtered_list: 89 candidate_build_target = "-".join( 90 [device_build.build_target, device_build.build_type]) 91 if (new_job.build_target == candidate_build_target and 92 (not new_job.require_signed_device_build or 93 device_build.signed)): 94 build_id = device_build.build_id 95 break 96 return build_id 97 98 def get(self): 99 """Generates an HTML page based on the task schedules kept in DB.""" 100 self.logger.Clear() 101 102 schedule_query = model.ScheduleModel.query() 103 schedules = schedule_query.fetch() 104 105 if schedules: 106 for schedule in schedules: 107 self.logger.Println("Schedule: %s (%s %s)" % 108 (schedule.test_name, 109 schedule.manifest_branch, 110 schedule.build_target)) 111 self.logger.Indent() 112 if self.NewPeriod(schedule): 113 self.logger.Println("- Need new job") 114 target_host, target_device, target_device_serials =\ 115 self.SelectTargetLab(schedule) 116 self.logger.Println("- Target host: %s" % target_host) 117 self.logger.Println("- Target device: %s" % target_device) 118 self.logger.Println( 119 "- Target serials: %s" % target_device_serials) 120 # TODO: update device status 121 122 # create job and add. 123 if target_host: 124 new_job = model.JobModel() 125 new_job.hostname = target_host 126 new_job.priority = schedule.priority 127 new_job.test_name = schedule.test_name 128 new_job.require_signed_device_build = ( 129 schedule.require_signed_device_build) 130 new_job.device = target_device 131 new_job.period = schedule.period 132 new_job.serial.extend(target_device_serials) 133 new_job.build_storage_type = schedule.build_storage_type 134 new_job.manifest_branch = schedule.manifest_branch 135 new_job.build_target = schedule.build_target 136 new_job.shards = schedule.shards 137 new_job.param = schedule.param 138 new_job.retry_count = schedule.retry_count 139 new_job.gsi_storage_type = schedule.gsi_storage_type 140 new_job.gsi_branch = schedule.gsi_branch 141 new_job.gsi_build_target = schedule.gsi_build_target 142 new_job.gsi_pab_account_id = schedule.gsi_pab_account_id 143 new_job.test_storage_type = schedule.test_storage_type 144 new_job.test_branch = schedule.test_branch 145 new_job.test_build_target = schedule.test_build_target 146 new_job.test_pab_account_id = ( 147 schedule.test_pab_account_id) 148 149 new_job.build_id = "" 150 151 if new_job.build_storage_type == ( 152 Status.STORAGE_TYPE_DICT["PAB"]): 153 new_job.build_id = self.FindBuildId(new_job) 154 if new_job.build_id: 155 self.ReserveDevices(target_device_serials) 156 new_job.status = Status.JOB_STATUS_DICT[ 157 "ready"] 158 new_job.timestamp = datetime.datetime.now() 159 new_job.put() 160 self.logger.Println("NEW JOB") 161 else: 162 self.logger.Println("NO BUILD FOUND") 163 elif new_job.build_storage_type == ( 164 Status.STORAGE_TYPE_DICT["GCS"]): 165 new_job.status = Status.JOB_STATUS_DICT["ready"] 166 new_job.timestamp = datetime.datetime.now() 167 new_job.put() 168 self.logger.Println("NEW JOB - GCS") 169 else: 170 self.logger.Println("Unexpected storage type.") 171 172 self.logger.Unindent() 173 174 self.response.write( 175 "<pre>\n" + "\n".join(self.logger.Get()) + "\n</pre>") 176 177 def NewPeriod(self, schedule): 178 """Checks whether a new job creation is needed. 179 180 Args: 181 schedule: a proto containing schedule information. 182 183 Returns: 184 True if new job is required, False otherwise. 185 """ 186 job_query = model.JobModel.query( 187 model.JobModel.manifest_branch == schedule.manifest_branch, 188 model.JobModel.build_target == schedule.build_target, 189 model.JobModel.test_name == schedule.test_name, 190 model.JobModel.period == schedule.period, 191 model.JobModel.shards == schedule.shards, 192 model.JobModel.retry_count == schedule.retry_count, 193 model.JobModel.gsi_branch == schedule.gsi_branch, 194 model.JobModel.test_branch == schedule.test_branch) 195 same_jobs = job_query.fetch() 196 same_jobs = [ 197 x for x in same_jobs 198 if (set(x.param) == set(schedule.param) 199 and x.device in schedule.device) 200 ] 201 if not same_jobs: 202 return True 203 204 outdated_jobs = [ 205 x for x in same_jobs 206 if (datetime.datetime.now() - x.timestamp > datetime.timedelta( 207 minutes=x.period)) 208 ] 209 outdated_ready_jobs = [ 210 x for x in outdated_jobs 211 if x.status == Status.JOB_STATUS_DICT["expired"] 212 ] 213 214 if outdated_ready_jobs: 215 msg = ("Job key[{}] is(are) outdated. " 216 "They became infra-err status.").format( 217 ", ".join( 218 [str(x.key.id()) for x in outdated_ready_jobs])) 219 logging.debug(msg) 220 self.logger.Println(msg) 221 for job in outdated_ready_jobs: 222 job.status = Status.JOB_STATUS_DICT["infra-err"] 223 job.put() 224 225 outdated_leased_jobs = [ 226 x for x in outdated_jobs 227 if x.status == Status.JOB_STATUS_DICT["leased"] 228 ] 229 if outdated_leased_jobs: 230 msg = ("Job key[{}] is(are) expected to be completed " 231 "however still in leased status.").format( 232 ", ".join( 233 [str(x.key.id()) for x in outdated_leased_jobs])) 234 logging.debug(msg) 235 self.logger.Println(msg) 236 237 recent_jobs = [x for x in same_jobs if x not in outdated_jobs] 238 239 if recent_jobs or outdated_leased_jobs: 240 return False 241 else: 242 return True 243 244 def SelectTargetLab(self, schedule): 245 """Find target host and devices to schedule a new job. 246 247 Args: 248 schedule: a proto containing the information of a schedule. 249 250 Returns: 251 a string which represents hostname, 252 a string containing target lab and product with '/' separator, 253 a list of selected devices serial (see whether devices will be 254 selected later when the job is picked up.) 255 """ 256 for target_device in schedule.device: 257 if "/" not in target_device: 258 # device malformed 259 continue 260 261 target_lab, target_product_type = target_device.split("/") 262 self.logger.Println("- Seeking product %s in lab %s" % 263 (target_product_type, target_lab)) 264 self.logger.Indent() 265 lab_query = model.LabModel.query(model.LabModel.name == target_lab) 266 target_labs = lab_query.fetch() 267 268 available_devices = {} 269 if target_labs: 270 for lab in target_labs: 271 self.logger.Println("- target lab found") 272 self.logger.Println("- target device %s %s" % 273 (lab.hostname, target_product_type)) 274 self.logger.Indent() 275 device_query = model.DeviceModel.query( 276 model.DeviceModel.hostname == lab.hostname) 277 host_devices = device_query.fetch() 278 279 for device in host_devices: 280 self.logger.Println("- check device %s %s" % 281 (device.status, device.product)) 282 if ((device.status in [ 283 Status.DEVICE_STATUS_DICT["fastboot"], 284 Status.DEVICE_STATUS_DICT["online"], 285 Status.DEVICE_STATUS_DICT["ready"] 286 ]) and (device.scheduling_status == 287 Status.DEVICE_SCHEDULING_STATUS_DICT["free"]) 288 and device.product == target_product_type): 289 self.logger.Println( 290 "- a device found %s" % device.serial) 291 if device.hostname not in available_devices: 292 available_devices[device.hostname] = set() 293 available_devices[device.hostname].add( 294 device.serial) 295 self.logger.Unindent() 296 for host in available_devices: 297 self.logger.Println("- len(devices) %s >= shards %s ?" % 298 (len(available_devices[host]), 299 schedule.shards)) 300 if len(available_devices[host]) >= schedule.shards: 301 self.logger.Unindent() 302 return host, target_device, list( 303 available_devices[host])[:schedule.shards] 304 self.logger.Unindent() 305 return None, None, [] 306