1 #pylint: disable-msg=C0111 2 3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 """Scheduler library classes. 8 """ 9 10 import collections 11 import logging 12 13 import common 14 15 from autotest_lib.client.common_lib.cros.graphite import autotest_stats 16 from autotest_lib.frontend import setup_django_environment 17 from autotest_lib.frontend.afe import models 18 from autotest_lib.server.cros.dynamic_suite import constants 19 from autotest_lib.scheduler import scheduler_models 20 from autotest_lib.scheduler import scheduler_lib 21 22 23 _job_timer = autotest_stats.Timer('scheduler.job_query_manager') 24 class AFEJobQueryManager(object): 25 """Query manager for AFE Jobs.""" 26 27 # A subquery to only get inactive hostless jobs. 28 hostless_query = 'host_id IS NULL AND meta_host IS NULL' 29 30 31 @_job_timer.decorate 32 def get_pending_queue_entries(self, only_hostless=False): 33 """ 34 Fetch a list of new host queue entries. 35 36 The ordering of this list is important, as every new agent 37 we schedule can potentially contribute to the process count 38 on the drone, which has a static limit. The sort order 39 prioritizes jobs as follows: 40 1. High priority jobs: Based on the afe_job's priority 41 2. With hosts and metahosts: This will only happen if we don't 42 activate the hqe after assigning a host to it in 43 schedule_new_jobs. 44 3. With hosts but without metahosts: When tests are scheduled 45 through the frontend the owner of the job would have chosen 46 a host for it. 47 4. Without hosts but with metahosts: This is the common case of 48 a new test that needs a DUT. We assign a host and set it to 49 active so it shouldn't show up in case 2 on the next tick. 50 5. Without hosts and without metahosts: Hostless suite jobs, that 51 will result in new jobs that fall under category 4. 52 53 A note about the ordering of cases 3 and 4: 54 Prioritizing one case above the other leads to earlier acquisition 55 of the following resources: 1. process slots on the drone 2. machines. 56 - When a user schedules a job through the afe they choose a specific 57 host for it. Jobs with metahost can utilize any host that satisfies 58 the metahost criterion. This means that if we had scheduled 4 before 59 3 there is a good chance that a job which could've used another host, 60 will now use the host assigned to a metahost-less job. Given the 61 availability of machines in pool:suites, this almost guarantees 62 starvation for jobs scheduled through the frontend. 63 - Scheduling 4 before 3 also has its pros however, since a suite 64 has the concept of a time out, whereas users can wait. If we hit the 65 process count on the drone a suite can timeout waiting on the test, 66 but a user job generally has a much longer timeout, and relatively 67 harmless consequences. 68 The current ordering was chosed because it is more likely that we will 69 run out of machines in pool:suites than processes on the drone. 70 71 @returns A list of HQEs ordered according to sort_order. 72 """ 73 sort_order = ('afe_jobs.priority DESC, ' 74 'ISNULL(host_id), ' 75 'ISNULL(meta_host), ' 76 'parent_job_id, ' 77 'job_id') 78 # Don't execute jobs that should be executed by a shard in the global 79 # scheduler. 80 # This won't prevent the shard scheduler to run this, as the shard db 81 # doesn't have an an entry in afe_shards_labels. 82 query=('NOT complete AND NOT active AND status="Queued"' 83 'AND NOT aborted AND afe_shards_labels.id IS NULL') 84 85 # TODO(jakobjuelich, beeps): Optimize this query. Details: 86 # Compressed output of EXPLAIN <query>: 87 # +------------------------+--------+-------------------------+-------+ 88 # | table | type | key | rows | 89 # +------------------------+--------+-------------------------+-------+ 90 # | afe_host_queue_entries | ref | host_queue_entry_status | 30536 | 91 # | afe_shards_labels | ref | shard_label_id_fk | 1 | 92 # | afe_jobs | eq_ref | PRIMARY | 1 | 93 # +------------------------+--------+-------------------------+-------+ 94 # This shows the first part of the query fetches a lot of objects, that 95 # are then filtered. The joins are comparably fast: There's usually just 96 # one or none shard mapping that can be answered fully using an index 97 # (shard_label_id_fk), similar thing applies to the job. 98 # 99 # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued), 100 # it might be more efficient to filter on the meta_host first, instead 101 # of the status. 102 if only_hostless: 103 query = '%s AND (%s)' % (query, self.hostless_query) 104 return list(scheduler_models.HostQueueEntry.fetch( 105 joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) ' 106 'LEFT JOIN afe_shards_labels ON (' 107 'meta_host=afe_shards_labels.label_id)'), 108 where=query, order_by=sort_order)) 109 110 111 @_job_timer.decorate 112 def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False): 113 """ 114 Returns all queued SpecialTasks prioritized for repair first, then 115 cleanup, then verify. 116 117 @param only_tasks_with_leased_hosts: If true, this method only returns 118 tasks with leased hosts. 119 120 @return: list of afe.models.SpecialTasks sorted according to priority. 121 """ 122 queued_tasks = models.SpecialTask.objects.filter(is_active=False, 123 is_complete=False, 124 host__locked=False) 125 # exclude hosts with active queue entries unless the SpecialTask is for 126 # that queue entry 127 queued_tasks = models.SpecialTask.objects.add_join( 128 queued_tasks, 'afe_host_queue_entries', 'host_id', 129 join_condition='afe_host_queue_entries.active', 130 join_from_key='host_id', force_left_join=True) 131 queued_tasks = queued_tasks.extra( 132 where=['(afe_host_queue_entries.id IS NULL OR ' 133 'afe_host_queue_entries.id = ' 134 'afe_special_tasks.queue_entry_id)']) 135 if only_tasks_with_leased_hosts: 136 queued_tasks = queued_tasks.filter(host__leased=True) 137 138 # reorder tasks by priority 139 task_priority_order = [models.SpecialTask.Task.REPAIR, 140 models.SpecialTask.Task.CLEANUP, 141 models.SpecialTask.Task.VERIFY, 142 models.SpecialTask.Task.RESET, 143 models.SpecialTask.Task.PROVISION] 144 def task_priority_key(task): 145 return task_priority_order.index(task.task) 146 return sorted(queued_tasks, key=task_priority_key) 147 148 149 @classmethod 150 def get_overlapping_jobs(cls): 151 """A helper method to get all active jobs using the same host. 152 153 @return: A list of dictionaries with the hqe id, job_id and host_id 154 of the currently overlapping jobs. 155 """ 156 # Filter all active hqes and stand alone special tasks to make sure 157 # a host isn't being used by two jobs at the same time. An incomplete 158 # stand alone special task can share a host with an active hqe, an 159 # example of this is the cleanup scheduled in gathering. 160 hqe_hosts = list(models.HostQueueEntry.objects.filter( 161 active=1, complete=0, host_id__isnull=False).values_list( 162 'host_id', flat=True)) 163 special_task_hosts = list(models.SpecialTask.objects.filter( 164 is_active=1, is_complete=0, host_id__isnull=False, 165 queue_entry_id__isnull=True).values_list('host_id', flat=True)) 166 host_counts = collections.Counter( 167 hqe_hosts + special_task_hosts).most_common() 168 multiple_hosts = [count[0] for count in host_counts if count[1] > 1] 169 return list(models.HostQueueEntry.objects.filter( 170 host_id__in=multiple_hosts, active=True).values( 171 'id', 'job_id', 'host_id')) 172 173 174 @_job_timer.decorate 175 def get_suite_host_assignment(self): 176 """A helper method to get how many hosts each suite is holding. 177 178 @return: Two dictionaries (suite_host_num, hosts_to_suites) 179 suite_host_num maps suite job id to number of hosts 180 holding by its child jobs. 181 hosts_to_suites contains current hosts held by 182 any suites, and maps the host id to its parent_job_id. 183 """ 184 query = models.HostQueueEntry.objects.filter( 185 host_id__isnull=False, complete=0, active=1, 186 job__parent_job_id__isnull=False) 187 suite_host_num = {} 188 hosts_to_suites = {} 189 for hqe in query: 190 host_id = hqe.host_id 191 parent_job_id = hqe.job.parent_job_id 192 count = suite_host_num.get(parent_job_id, 0) 193 suite_host_num[parent_job_id] = count + 1 194 hosts_to_suites[host_id] = parent_job_id 195 return suite_host_num, hosts_to_suites 196 197 198 @_job_timer.decorate 199 def get_min_duts_of_suites(self, suite_job_ids): 200 """Load suite_min_duts job keyval for a set of suites. 201 202 @param suite_job_ids: A set of suite job ids. 203 204 @return: A dictionary where the key is a suite job id, 205 the value is the value of 'suite_min_duts'. 206 """ 207 query = models.JobKeyval.objects.filter( 208 job_id__in=suite_job_ids, 209 key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False) 210 return dict((keyval.job_id, int(keyval.value)) for keyval in query) 211 212 213 _host_timer = autotest_stats.Timer('scheduler.host_query_manager') 214 class AFEHostQueryManager(object): 215 """Query manager for AFE Hosts.""" 216 217 def __init__(self): 218 """Create an AFEHostQueryManager. 219 220 @param db: A connection to the database with the afe_hosts table. 221 """ 222 self._db = scheduler_lib.ConnectionManager().get_connection() 223 224 225 def _process_many2many_dict(self, rows, flip=False): 226 result = {} 227 for row in rows: 228 left_id, right_id = int(row[0]), int(row[1]) 229 if flip: 230 left_id, right_id = right_id, left_id 231 result.setdefault(left_id, set()).add(right_id) 232 return result 233 234 235 def _get_sql_id_list(self, id_list): 236 return ','.join(str(item_id) for item_id in id_list) 237 238 239 def _get_many2many_dict(self, query, id_list, flip=False): 240 if not id_list: 241 return {} 242 query %= self._get_sql_id_list(id_list) 243 rows = self._db.execute(query) 244 return self._process_many2many_dict(rows, flip) 245 246 247 @_host_timer.decorate 248 def _get_ready_hosts(self): 249 # We don't lose anything by re-doing these checks 250 # even though we release hosts on the same conditions. 251 # In the future we might have multiple clients that 252 # release_hosts and/or lock them independent of the 253 # scheduler tick. 254 hosts = scheduler_models.Host.fetch( 255 where="NOT afe_hosts.leased " 256 "AND NOT afe_hosts.locked " 257 "AND (afe_hosts.status IS NULL " 258 "OR afe_hosts.status = 'Ready')") 259 return dict((host.id, host) for host in hosts) 260 261 262 @_host_timer.decorate 263 def _get_job_acl_groups(self, job_ids): 264 query = """ 265 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id 266 FROM afe_jobs 267 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner 268 INNER JOIN afe_acl_groups_users ON 269 afe_acl_groups_users.user_id = afe_users.id 270 WHERE afe_jobs.id IN (%s) 271 """ 272 return self._get_many2many_dict(query, job_ids) 273 274 275 @_host_timer.decorate 276 def _get_job_ineligible_hosts(self, job_ids): 277 query = """ 278 SELECT job_id, host_id 279 FROM afe_ineligible_host_queues 280 WHERE job_id IN (%s) 281 """ 282 return self._get_many2many_dict(query, job_ids) 283 284 285 @_host_timer.decorate 286 def _get_job_dependencies(self, job_ids): 287 query = """ 288 SELECT job_id, label_id 289 FROM afe_jobs_dependency_labels 290 WHERE job_id IN (%s) 291 """ 292 return self._get_many2many_dict(query, job_ids) 293 294 @_host_timer.decorate 295 def _get_host_acls(self, host_ids): 296 query = """ 297 SELECT host_id, aclgroup_id 298 FROM afe_acl_groups_hosts 299 WHERE host_id IN (%s) 300 """ 301 return self._get_many2many_dict(query, host_ids) 302 303 304 @_host_timer.decorate 305 def _get_label_hosts(self, host_ids): 306 if not host_ids: 307 return {}, {} 308 query = """ 309 SELECT label_id, host_id 310 FROM afe_hosts_labels 311 WHERE host_id IN (%s) 312 """ % self._get_sql_id_list(host_ids) 313 rows = self._db.execute(query) 314 labels_to_hosts = self._process_many2many_dict(rows) 315 hosts_to_labels = self._process_many2many_dict(rows, flip=True) 316 return labels_to_hosts, hosts_to_labels 317 318 319 @classmethod 320 def find_unused_healty_hosts(cls): 321 """Get hosts that are currently unused and in the READY state. 322 323 @return: A list of host objects, one for each unused healthy host. 324 """ 325 # Avoid any host with a currently active queue entry against it. 326 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe ' 327 'ON (afe_hosts.id = active_hqe.host_id AND ' 328 'active_hqe.active)') 329 330 # Avoid any host with a new special task against it. There are 2 cases 331 # when an inactive but incomplete special task will not use the host 332 # this tick: 1. When the host is locked 2. When an active hqe already 333 # has special tasks for the same host. In both these cases this host 334 # will not be in the ready hosts list anyway. In all other cases, 335 # an incomplete special task will grab the host before a new job does 336 # by assigning an agent to it. 337 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks ' 338 'ON (afe_hosts.id = new_tasks.host_id AND ' 339 'new_tasks.is_complete=0)') 340 341 return scheduler_models.Host.fetch( 342 joins='%s %s' % (hqe_join, special_task_join), 343 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL " 344 "AND afe_hosts.leased " 345 "AND NOT afe_hosts.locked " 346 "AND (afe_hosts.status IS NULL " 347 "OR afe_hosts.status = 'Ready')") 348 349 350 @_host_timer.decorate 351 def set_leased(self, leased_value, **kwargs): 352 """Modify the leased bit on the hosts with ids in host_ids. 353 354 @param leased_value: The True/False value of the leased column for 355 the hosts with ids in host_ids. 356 @param kwargs: The args to use in finding matching hosts. 357 """ 358 logging.info('Setting leased = %s for the hosts that match %s', 359 leased_value, kwargs) 360 models.Host.objects.filter(**kwargs).update(leased=leased_value) 361 362 363 @_host_timer.decorate 364 def _get_labels(self, job_dependencies): 365 """ 366 Calculate a dict mapping label id to label object so that we don't 367 frequently round trip to the database every time we need a label. 368 369 @param job_dependencies: A dict mapping an integer job id to a list of 370 integer label id's. ie. {job_id: [label_id]} 371 @return: A dict mapping an integer label id to a scheduler model label 372 object. ie. {label_id: label_object} 373 374 """ 375 id_to_label = dict() 376 # Pull all the labels on hosts we might look at 377 host_labels = scheduler_models.Label.fetch( 378 where="id IN (SELECT label_id FROM afe_hosts_labels)") 379 id_to_label.update([(label.id, label) for label in host_labels]) 380 # and pull all the labels on jobs we might look at. 381 job_label_set = set() 382 for job_deps in job_dependencies.values(): 383 job_label_set.update(job_deps) 384 # On the rare/impossible chance that no jobs have any labels, we 385 # can skip this. 386 if job_label_set: 387 job_string_label_list = ','.join([str(x) for x in job_label_set]) 388 job_labels = scheduler_models.Label.fetch( 389 where="id IN (%s)" % job_string_label_list) 390 id_to_label.update([(label.id, label) for label in job_labels]) 391 return id_to_label 392 393 394 @_host_timer.decorate 395 def refresh(self, pending_queue_entries): 396 """Update the query manager. 397 398 Cache information about a list of queue entries and eligible hosts 399 from the database so clients can avoid expensive round trips during 400 host acquisition. 401 402 @param pending_queue_entries: A list of queue entries about which we 403 need information. 404 """ 405 self._hosts_available = self._get_ready_hosts() 406 relevant_jobs = [queue_entry.job_id 407 for queue_entry in pending_queue_entries] 408 self._job_acls = self._get_job_acl_groups(relevant_jobs) 409 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs)) 410 self._job_dependencies = (self._get_job_dependencies(relevant_jobs)) 411 host_ids = self._hosts_available.keys() 412 self._host_acls = self._get_host_acls(host_ids) 413 self._label_hosts, self._host_labels = ( 414 self._get_label_hosts(host_ids)) 415 self._labels = self._get_labels(self._job_dependencies) 416