1 # pylint: disable=missing-docstring 2 3 """ This is the module for everything related to the AgentTask. 4 5 The AgentTask imposes an interface through which the scheduler can monitor 6 a processes; Examples of such processes include Verify, Cleanup and the Queue 7 Tasks that run the tests. The scheduler itself only understands Agents. 8 Agents: 9 The Agent is the bridge between the scheduler and the AgentTask. The 10 schedulers tick has a method called handle_agents, which calls the 11 tick of each agent in the Dispatchers queue. This leads to the Agent 12 polling its AgentTask. The scheduler will keep polling a task through 13 the associated Agent till the Agent is removed from the dispatcher. 14 15 At a high level: 16 agents finished = tasks done 17 agent polls till finished 18 task polls till done 19 task sets done 20 agent is removed from dispatcher 21 AgentTasks: 22 Basic AgentTasks are created when an hqe changes state. Examples of these 23 are the QueueTask, which is created when a hqe goes into the Starting state 24 and the FinalReparseTask, which is created when the hqe goes into parsing. 25 SpecialAgentTasks: 26 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted 27 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks. 28 29 Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps 30 an AgentTask to an Agent, which the scheduler understands. From this point 31 onward, the scheduler manages the task through the Agents interface,as follows: 32 At a high level: 33 task poll 34 start 35 prolog 36 tick till we get an exit code 37 finished(exit==0) 38 done=True 39 epilog 40 cleanup 41 set is_active, is_complete, success (checked in scheduler) 42 43 The first special task for an HQE is usually Reset. 44 -poll: The first poll will start the task, polls thereafter will call the tasks 45 tick method. A started task will have the started bit set. 46 - start: Call prolog, run the process and set the start bit. 47 - prolog: Usually where one puts any model state changes that happen before 48 the actual task. Different per Task. Examples of things that might 49 happen in a prolog: 50 - state of Host, HQE (to something like Resetting) 51 - delete any unwanted queued special tasks 52 - register a pidfile 53 - set the is_active bit on the special task 54 - run: 55 - create a PidfileRunMonitor 56 - pass the autoserv command, working directory etc to drone manager. 57 This will start the actual autoserv process. 58 - set the start bit: so subsequent polls do not 'start' again 59 60 - tick: For as long as a started tasks done bit is not set, a poll will lead 61 to a tick. The tick monitors the pid file of the autoserv process 62 running on the drone through the PidfileRunMonitor created in prolog. 63 If the autoserv process has finished we call finished with true/false 64 depending on autoserv exit code. 65 66 - finished: sets the done and success values, then calls epilog. The 67 done bit is important because the Agent polls this bit to 68 measure the success or failure of its task. 69 70 - epilog: Is generally where we set status of the Host/HQE again, 71 requeue any other task that needs to run after this one 72 and perform cleanup. Just like the prolog, this step is 73 different per task. 74 75 - cleanup: Sets the is_active and is_complete and success 76 states on the tasks model. Also uses the 77 drone_manager to: 78 unregister the pidfile 79 copy results of the task 80 (Note this is not to be confused with the 81 special task called cleanup). 82 83 The actions we take in the epilog are based on the 84 success/failure of the autoserv process set in cleanup, 85 eg: if reset failed we will enqueue a repair, but if all 86 is well the epilog will just return. Prejob task epilogs 87 also have an on_pending method that change the status of 88 the HQE to pending/starting, which gets picked up in the 89 scheduler. 90 By this point the is_done flag is set, which results in the Agent noticing that 91 the task has finished and unregistering it from the dispatcher.Class hierarchy: 92 AgentTask 93 |--->SpecialAgentTask (prejob_task.py) 94 |--->RepairTask 95 |--->PreJobTask 96 |--->Verify, Cleanup, Reset, Provision 97 98 |--->AbstractQueueTask (monitor_db.py) 99 |--->QueueTask 100 |--->HostlessQueueTask 101 102 |--->PostJobTask (postjob_task.py) 103 |--->GatherLogsTask 104 |--->SelfThrottledPostJobTask 105 |--->FinalReparseTask 106 107 """ 108 109 import logging 110 import os 111 import time 112 import urllib 113 114 import common 115 116 from autotest_lib.client.common_lib import global_config 117 from autotest_lib.client.common_lib import utils 118 from autotest_lib.frontend.afe import models 119 from autotest_lib.scheduler import drone_manager 120 from autotest_lib.scheduler import email_manager 121 from autotest_lib.scheduler import pidfile_monitor 122 from autotest_lib.scheduler import rdb_lib 123 from autotest_lib.scheduler import scheduler_lib 124 from autotest_lib.scheduler import scheduler_models 125 from autotest_lib.server import autoserv_utils 126 from autotest_lib.server import system_utils 127 128 try: 129 from chromite.lib import metrics 130 except ImportError: 131 metrics = utils.metrics_mock 132 133 134 CONFIG = global_config.global_config 135 AUTOSERV_NICE_LEVEL = 10 136 137 ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value( 138 'CROS', 'enable_drone_in_restricted_subnet', type=bool, 139 default=False) 140 141 142 class AgentTask(object): 143 class _NullMonitor(object): 144 pidfile_id = None 145 146 def has_process(self): 147 return True 148 149 150 def __init__(self, log_file_name=None): 151 """ 152 @param log_file_name: (optional) name of file to log command output to 153 """ 154 self._drone_manager = drone_manager.instance() 155 self.done = False 156 self.started = False 157 self.success = None 158 self.aborted = False 159 self.monitor = None 160 self.queue_entry_ids = [] 161 self.host_ids = [] 162 # A map between host id and hostname. 163 self.hostnames = {} 164 self._log_file_name = log_file_name 165 166 167 def _set_ids(self, host=None, queue_entries=None): 168 if queue_entries and queue_entries != [None]: 169 self.host_ids = [] 170 self.queue_entry_ids = [] 171 self.hostnames = {} 172 for entry in queue_entries: 173 if entry.host is not None: 174 self.host_ids.append(entry.host.id) 175 self.queue_entry_ids.append(entry.id) 176 self.hostnames[entry.host.id] = entry.host.hostname 177 else: 178 logging.debug( 179 'No host is found for host_queue_entry_id: %r', 180 entry.id) 181 raise scheduler_lib.NoHostIdError( 182 'Failed to schedule a job whose ' 183 'host_queue_entry_id=%r due to no host_id.' 184 % entry.id) 185 else: 186 assert host 187 self.host_ids = [host.id] 188 self.hostnames = {host.id: host.hostname} 189 190 191 def poll(self): 192 if not self.started: 193 self.start() 194 if not self.done: 195 self.tick() 196 197 198 def tick(self): 199 assert self.monitor 200 exit_code = self.monitor.exit_code() 201 if exit_code is None: 202 return 203 204 success = (exit_code == 0) 205 self.finished(success) 206 207 208 def is_done(self): 209 return self.done 210 211 212 def finished(self, success): 213 if self.done: 214 assert self.started 215 return 216 self.started = True 217 self.done = True 218 self.success = success 219 self.epilog() 220 221 222 def prolog(self): 223 """ 224 To be overridden. 225 """ 226 assert not self.monitor 227 self.register_necessary_pidfiles() 228 229 230 def _log_file(self): 231 if not self._log_file_name: 232 return None 233 return os.path.join(self._working_directory(), self._log_file_name) 234 235 236 def cleanup(self): 237 log_file = self._log_file() 238 if self.monitor and log_file: 239 self.monitor.try_copy_to_results_repository(log_file) 240 241 242 def epilog(self): 243 """ 244 To be overridden. 245 """ 246 self.cleanup() 247 logging.info("%s finished with success=%s", type(self).__name__, 248 self.success) 249 250 251 def start(self): 252 if not self.started: 253 self.prolog() 254 self.run() 255 256 self.started = True 257 258 259 def abort(self): 260 if self.monitor: 261 self.monitor.kill() 262 self.done = True 263 self.aborted = True 264 self.cleanup() 265 266 267 def _get_consistent_execution_path(self, execution_entries): 268 first_execution_path = execution_entries[0].execution_path() 269 for execution_entry in execution_entries[1:]: 270 assert execution_entry.execution_path() == first_execution_path, ( 271 '%s (%s) != %s (%s)' % (execution_entry.execution_path(), 272 execution_entry, 273 first_execution_path, 274 execution_entries[0])) 275 return first_execution_path 276 277 278 def _copy_results(self, execution_entries, use_monitor=None): 279 """ 280 @param execution_entries: list of objects with execution_path() method 281 """ 282 if use_monitor is not None and not use_monitor.has_process(): 283 return 284 285 assert len(execution_entries) > 0 286 if use_monitor is None: 287 assert self.monitor 288 use_monitor = self.monitor 289 assert use_monitor.has_process() 290 execution_path = self._get_consistent_execution_path(execution_entries) 291 results_path = execution_path + '/' 292 use_monitor.try_copy_to_results_repository(results_path) 293 294 295 def _parse_results(self, queue_entries): 296 for queue_entry in queue_entries: 297 queue_entry.set_status(models.HostQueueEntry.Status.PARSING) 298 299 300 def _command_line(self): 301 """ 302 Return the command line to run. Must be overridden. 303 """ 304 raise NotImplementedError 305 306 307 @property 308 def num_processes(self): 309 """ 310 Return the number of processes forked by this AgentTask's process. 311 It may only be approximate. To be overridden if necessary. 312 """ 313 return 1 314 315 316 def _paired_with_monitor(self): 317 """ 318 If this AgentTask's process must run on the same machine as some 319 previous process, this method should be overridden to return a 320 PidfileRunMonitor for that process. 321 """ 322 return self._NullMonitor() 323 324 325 @property 326 def owner_username(self): 327 """ 328 Return login of user responsible for this task. May be None. Must be 329 overridden. 330 """ 331 raise NotImplementedError 332 333 334 def _working_directory(self): 335 """ 336 Return the directory where this AgentTask's process executes. 337 Must be overridden. 338 """ 339 raise NotImplementedError 340 341 342 def _pidfile_name(self): 343 """ 344 Return the name of the pidfile this AgentTask's process uses. To be 345 overridden if necessary. 346 """ 347 return drone_manager.AUTOSERV_PID_FILE 348 349 350 def _check_paired_results_exist(self): 351 if not self._paired_with_monitor().has_process(): 352 metrics.Counter( 353 'chromeos/autotest/errors/scheduler/no_paired_results' 354 ).increment() 355 self.finished(False) 356 return False 357 return True 358 359 360 def _create_monitor(self): 361 assert not self.monitor 362 self.monitor = pidfile_monitor.PidfileRunMonitor() 363 364 365 def run(self): 366 if not self._check_paired_results_exist(): 367 return 368 369 self._create_monitor() 370 self.monitor.run( 371 self._command_line(), self._working_directory(), 372 num_processes=self.num_processes, 373 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(), 374 pidfile_name=self._pidfile_name(), 375 paired_with_pidfile=self._paired_with_monitor().pidfile_id, 376 username=self.owner_username, 377 drone_hostnames_allowed=self.get_drone_hostnames_allowed()) 378 379 380 def get_drone_hostnames_allowed( 381 self, restricted_subnets=utils.RESTRICTED_SUBNETS, 382 enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET): 383 filtered_drones = None 384 has_unrestricted_host = False 385 if (self.hostnames and restricted_subnets and enable_drone_in_subnet): 386 for hostname in self.hostnames.values(): 387 subnet = utils.get_restricted_subnet(hostname, 388 restricted_subnets) 389 390 # Return an empty set if the list of hosts exists both in 391 # restricted and unrestricted subnet. No drone can work in such 392 # case. 393 if ((not subnet and filtered_drones is not None) or 394 (subnet and has_unrestricted_host)): 395 logging.error('The test has some DUT in restricted subnet, ' 396 'but some in unrestricted subnet. Therefore, ' 397 'no drone is available to run the test.') 398 return set() 399 400 if not subnet: 401 has_unrestricted_host = True 402 continue 403 404 server_ip_map=system_utils.DroneCache.get_drone_ip_map() 405 filtered_drones_for_host = set( 406 utils.get_servers_in_same_subnet( 407 subnet[0], subnet[1], 408 server_ip_map=server_ip_map)) 409 logging.info('DUT %s is in restricted subnet, drone can only ' 410 'be chosen from %s', hostname, 411 filtered_drones_for_host) 412 if filtered_drones is None: 413 filtered_drones = filtered_drones_for_host 414 else: 415 filtered_drones = set.intersection( 416 filtered_drones, filtered_drones_for_host) 417 418 # If filtered_drones is an empty set, that means no drone is 419 # allowed to run the task. This is different fron None, which 420 # means all drones are allowed. 421 if filtered_drones == set(): 422 logging.error('DUT(s) is in restricted subnet, but no ' 423 'drone is available to run the test.') 424 return filtered_drones 425 426 # If host is not in restricted subnet, use the unrestricted drones only. 427 if (filtered_drones is None and restricted_subnets and 428 enable_drone_in_subnet): 429 filtered_drones = set( 430 system_utils.DroneCache.get_unrestricted_drones( 431 restricted_subnets=restricted_subnets)) 432 433 if not models.DroneSet.drone_sets_enabled(): 434 return filtered_drones 435 436 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids) 437 if not hqes: 438 # Only special tasks could be missing host queue entries 439 assert isinstance(self, SpecialAgentTask) 440 return self._user_or_global_default_drone_set( 441 self.task, self.task.requested_by) 442 443 job_ids = hqes.values_list('job', flat=True).distinct() 444 assert job_ids.count() == 1, ("AgentTask's queue entries " 445 "span multiple jobs") 446 447 job = models.Job.objects.get(id=job_ids[0]) 448 drone_set = job.drone_set 449 if not drone_set: 450 return self._user_or_global_default_drone_set(job, job.user()) 451 452 if filtered_drones: 453 return set.intersection(filtered_drones, 454 drone_set.get_drone_hostnames()) 455 else: 456 return drone_set.get_drone_hostnames() 457 458 459 def _user_or_global_default_drone_set(self, obj_with_owner, user): 460 """ 461 Returns the user's default drone set, if present. 462 463 Otherwise, returns the global default drone set. 464 """ 465 default_hostnames = models.DroneSet.get_default().get_drone_hostnames() 466 if not user: 467 logging.warning('%s had no owner; using default drone set', 468 obj_with_owner) 469 return default_hostnames 470 if not user.drone_set: 471 logging.warning('User %s has no default drone set, using global ' 472 'default', user.login) 473 return default_hostnames 474 return user.drone_set.get_drone_hostnames() 475 476 477 def register_necessary_pidfiles(self): 478 pidfile_id = self._drone_manager.get_pidfile_id_from( 479 self._working_directory(), self._pidfile_name()) 480 self._drone_manager.register_pidfile(pidfile_id) 481 482 paired_pidfile_id = self._paired_with_monitor().pidfile_id 483 if paired_pidfile_id: 484 self._drone_manager.register_pidfile(paired_pidfile_id) 485 486 487 def recover(self): 488 if not self._check_paired_results_exist(): 489 return 490 491 self._create_monitor() 492 self.monitor.attach_to_existing_process( 493 self._working_directory(), pidfile_name=self._pidfile_name(), 494 num_processes=self.num_processes) 495 if not self.monitor.has_process(): 496 # no process to recover; wait to be started normally 497 self.monitor = None 498 return 499 500 self.started = True 501 logging.info('Recovering process %s for %s at %s', 502 self.monitor.get_process(), type(self).__name__, 503 self._working_directory()) 504 505 506 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 507 allowed_host_statuses=None): 508 class_name = self.__class__.__name__ 509 for entry in queue_entries: 510 if entry.status not in allowed_hqe_statuses: 511 # In the orignal code, here we raise an exception. In an 512 # effort to prevent downtime we will instead abort the job and 513 # send out an email notifying us this has occured. 514 error_message = ('%s attempting to start entry with invalid ' 515 'status %s: %s. Aborting Job: %s.' 516 % (class_name, entry.status, entry, 517 entry.job)) 518 logging.error(error_message) 519 email_manager.manager.enqueue_notify_email( 520 'Job Aborted - Invalid Host Queue Entry Status', 521 error_message) 522 entry.job.request_abort() 523 invalid_host_status = ( 524 allowed_host_statuses is not None 525 and entry.host.status not in allowed_host_statuses) 526 if invalid_host_status: 527 # In the orignal code, here we raise an exception. In an 528 # effort to prevent downtime we will instead abort the job and 529 # send out an email notifying us this has occured. 530 error_message = ('%s attempting to start on queue entry with ' 531 'invalid host status %s: %s. Aborting Job: %s' 532 % (class_name, entry.host.status, entry, 533 entry.job)) 534 logging.error(error_message) 535 email_manager.manager.enqueue_notify_email( 536 'Job Aborted - Invalid Host Status', error_message) 537 entry.job.request_abort() 538 539 540 class TaskWithJobKeyvals(object): 541 """AgentTask mixin providing functionality to help with job keyval files.""" 542 _KEYVAL_FILE = 'keyval' 543 def _format_keyval(self, key, value): 544 return '%s=%s' % (key, value) 545 546 547 def _keyval_path(self): 548 """Subclasses must override this""" 549 raise NotImplementedError 550 551 552 def _write_keyval_after_job(self, field, value): 553 assert self.monitor 554 if not self.monitor.has_process(): 555 return 556 self._drone_manager.write_lines_to_file( 557 self._keyval_path(), [self._format_keyval(field, value)], 558 paired_with_process=self.monitor.get_process()) 559 560 561 def _job_queued_keyval(self, job): 562 return 'job_queued', int(time.mktime(job.created_on.timetuple())) 563 564 565 def _write_job_finished(self): 566 self._write_keyval_after_job("job_finished", int(time.time())) 567 568 569 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path): 570 keyval_contents = '\n'.join(self._format_keyval(key, value) 571 for key, value in keyval_dict.iteritems()) 572 # always end with a newline to allow additional keyvals to be written 573 keyval_contents += '\n' 574 self._drone_manager.attach_file_to_execution(self._working_directory(), 575 keyval_contents, 576 file_path=keyval_path) 577 578 579 def _write_keyvals_before_job(self, keyval_dict): 580 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path()) 581 582 583 def _write_host_keyvals(self, host): 584 keyval_path = os.path.join(self._working_directory(), 'host_keyvals', 585 host.hostname) 586 platform, all_labels = host.platform_and_labels() 587 all_labels = [ urllib.quote(label) for label in all_labels ] 588 keyval_dict = dict(platform=platform, labels=','.join(all_labels)) 589 self._write_keyvals_before_job_helper(keyval_dict, keyval_path) 590 591 592 class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): 593 """ 594 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. 595 """ 596 597 TASK_TYPE = None 598 host = None 599 queue_entry = None 600 _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count' 601 _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut' 602 _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations' 603 604 605 def __init__(self, task, extra_command_args): 606 super(SpecialAgentTask, self).__init__() 607 608 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden' 609 610 self.host = rdb_lib.get_hosts([task.host.id])[0] 611 self.host.dbg_str = 'Task: %s' % str(task) 612 self.queue_entry = None 613 if task.queue_entry: 614 self.queue_entry = scheduler_models.HostQueueEntry( 615 id=task.queue_entry.id) 616 self.host.dbg_str += self.queue_entry.get_dbg_str() 617 618 # This is of type SpecialTask (as defined in frontend/afe/models.py) 619 self.task = task 620 self._extra_command_args = extra_command_args 621 self.host.metadata = self.get_metadata() 622 self._milestone = '' 623 624 625 def get_metadata(self): 626 """Get a dictionary that contains task information. 627 628 The return value is a dictionary that includes task information like id, 629 name and related job information. The value will be stored in metadata 630 database. 631 @return: A dictionary containing the task id, name and related job id. 632 If some attributes are failed to be accessed, an empty 633 dictionary will be returned, and error will be logged. 634 """ 635 try: 636 metadata = {'task_id':self.task.id, 'task_name':self.task.task, 637 'hostname':self.task.host.hostname} 638 if self.task.queue_entry: 639 job = self.task.queue_entry.job 640 metadata.update( 641 scheduler_models.get_job_metadata(job)) 642 return metadata 643 except AttributeError as e: 644 logging.error('Task has missing attribute: %s', e) 645 return {} 646 647 648 def _keyval_path(self): 649 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 650 651 652 def _command_line(self): 653 return autoserv_utils._autoserv_command_line(self.host.hostname, 654 self._extra_command_args, 655 queue_entry=self.queue_entry, 656 in_lab=True) 657 658 659 def _working_directory(self): 660 return self.task.execution_path() 661 662 663 @property 664 def owner_username(self): 665 if self.task.requested_by: 666 return self.task.requested_by.login 667 return None 668 669 670 def prolog(self): 671 super(SpecialAgentTask, self).prolog() 672 self.task.activate() 673 self._write_host_keyvals(self.host) 674 675 676 def _fail_queue_entry(self): 677 assert self.queue_entry 678 679 if self.queue_entry.meta_host: 680 return # don't fail metahost entries, they'll be reassigned 681 682 self.queue_entry.update_from_database() 683 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: 684 return # entry has been aborted 685 686 self._actually_fail_queue_entry() 687 688 689 def epilog(self): 690 super(SpecialAgentTask, self).epilog() 691 self._emit_special_task_status_metric() 692 693 694 def _emit_special_task_status_metric(self): 695 """Increments an accumulator associated with this special task.""" 696 fields = {'type': self.TASK_TYPE, 697 'success': bool(self.success), 698 'board': str(self.host.board), 699 'milestone': self._milestone} 700 metrics.Counter(self._COUNT_METRIC).increment( 701 fields=fields) 702 703 if (self.task.time_finished and self.task.time_started): 704 duration = (self.task.time_finished - 705 self.task.time_started).total_seconds() 706 metrics.SecondsDistribution(self._DURATION_METRIC).add( 707 duration, fields=fields) 708 709 dut_fields = { 710 'type': self.TASK_TYPE, 711 'success': bool(self.success), 712 'board': str(self.host.board), 713 'dut_host_name': self.host.hostname 714 } 715 metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields) 716 717 # TODO(milleral): http://crbug.com/268607 718 # All this used to be a part of _fail_queue_entry. The 719 # exact semantics of when one should and should not be failing a queue 720 # entry need to be worked out, because provisioning has placed us in a 721 # case where we want to fail a queue entry that could be requeued, 722 # which makes us fail the two above if statements, and thus 723 # _fail_queue_entry() would exit early and have no effect. 724 # What's left here with _actually_fail_queue_entry is a hack to be able to 725 # bypass the checks and unconditionally execute the code. 726 def _actually_fail_queue_entry(self): 727 self.queue_entry.set_execution_subdir() 728 queued_key, queued_time = self._job_queued_keyval( 729 self.queue_entry.job) 730 self._write_keyval_after_job(queued_key, queued_time) 731 self._write_job_finished() 732 733 # copy results logs into the normal place for job results 734 self.monitor.try_copy_results_on_drone( 735 source_path=self._working_directory() + '/', 736 destination_path=self.queue_entry.execution_path() + '/') 737 738 pidfile_id = self._drone_manager.get_pidfile_id_from( 739 self.queue_entry.execution_path(), 740 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 741 self._drone_manager.register_pidfile(pidfile_id) 742 743 # TODO(ayatane): This should obey self.queue_entry.job.parse_failed_repair 744 # But nothing sets self.queue_entry.job.parse_failed_repair? 745 # Check Git blame 746 self._parse_results([self.queue_entry]) 747 748 # Also fail all other special tasks that have not yet run for this HQE 749 pending_tasks = models.SpecialTask.objects.filter( 750 queue_entry__id=self.queue_entry.id, 751 is_complete=0) 752 for task in pending_tasks: 753 task.finish(False) 754 755 756 def cleanup(self): 757 super(SpecialAgentTask, self).cleanup() 758 759 # We will consider an aborted task to be "Failed" 760 self.task.finish(bool(self.success)) 761 762 if self.monitor: 763 if self.monitor.has_process(): 764 self._copy_results([self.task]) 765 if self.monitor.pidfile_id is not None: 766 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id) 767 768 769 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False): 770 """Remove a type of special task in all tasks, keep last one if needed. 771 772 @param special_task_to_remove: type of special task to be removed, e.g., 773 models.SpecialTask.Task.VERIFY. 774 @param keep_last_one: True to keep the last special task if its type is 775 the same as of special_task_to_remove. 776 777 """ 778 queued_special_tasks = models.SpecialTask.objects.filter( 779 host__id=self.host.id, 780 task=special_task_to_remove, 781 is_active=False, is_complete=False, queue_entry=None) 782 if keep_last_one: 783 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id) 784 queued_special_tasks.delete() 785 786 787 def _generate_autoserv_label_args(self, task): 788 """ 789 @param task: An instance of afe model's SpecialTask. 790 @returns: The list of arguments to pass to autoserv to tell it what the 791 labels of a job are. 792 793 """ 794 labels = {x.name for x in task.queue_entry.job.labels} 795 return ['--job-labels', ','.join(labels)] 796