Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 """ This is the module for everything related to the AgentTask.
      4 
      5 The BaseAgentTask imposes an interface through which the scheduler can monitor
      6 a processes; Examples of such processes include Verify, Cleanup and the Queue
      7 Tasks that run the tests. The scheduler itself only understands Agents.
      8 Agents:
      9     The Agent is the bridge between the scheduler and the AgentTask. The
     10     schedulers tick has a method called handle_agents, which calls the
     11     tick of each agent in the Dispatchers queue. This leads to the Agent
     12     polling its AgentTask. The scheduler will keep polling a task through
     13     the associated Agent till the Agent is removed from the dispatcher.
     14 
     15     At a high level:
     16         agents finished = tasks done
     17         agent polls till finished
     18             task polls till done
     19                 task sets done
     20         agent is removed from dispatcher
     21 AgentTasks:
     22     Basic AgentTasks are created when an hqe changes state. Examples of these
     23     are the QueueTask, which is created when a hqe goes into the Starting state
     24     and the FinalReparseTask, which is created when the hqe goes into parsing.
     25 SpecialAgentTasks:
     26     Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
     27     in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
     28 
     29 Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
     30 an AgentTask to an Agent, which the scheduler understands. From this point
     31 onward, the scheduler manages the task through the Agents interface,as follows:
     32 At a high level:
     33     task poll
     34         start
     35             prolog
     36         tick till we get an exit code
     37         finished(exit==0)
     38             done=True
     39             epilog
     40                 cleanup
     41                     set is_active, is_complete, success (checked in scheduler)
     42 
     43 The first special task for an HQE is usually Reset.
     44 -poll: The first poll will start the task, polls thereafter will call the tasks
     45        tick method. A started task will have the started bit set.
     46 - start: Call prolog, run the process and set the start bit.
     47     - prolog: Usually where one puts any model state changes that happen before
     48               the actual task. Different per Task. Examples of things that might
     49               happen in a prolog:
     50                   - state of Host, HQE (to something like Resetting)
     51                   - delete any unwanted queued special tasks
     52                   - register a pidfile
     53                   - set the is_active bit on the special task
     54     - run:
     55         - create a PidfileRunMonitor
     56         - pass the autoserv command, working directory etc to drone manager.
     57           This will start the actual autoserv process.
     58    - set the start bit: so subsequent polls do not 'start' again
     59 
     60 - tick: For as long as a started tasks done bit is not set, a poll will lead
     61         to a tick. The tick monitors the pid file of the autoserv process
     62         running on the drone through the PidfileRunMonitor created in prolog.
     63         If the autoserv process has finished we call finished with true/false
     64         depending on autoserv exit code.
     65 
     66         - finished: sets the done and success values, then calls epilog. The
     67                     done bit is important because the Agent polls this bit to
     68                     measure the success or failure of its task.
     69 
     70             - epilog: Is generally where we set status of the Host/HQE again,
     71                       requeue any other task that needs to run after this one
     72                       and perform cleanup. Just like the prolog, this step is
     73                       different per task.
     74 
     75                       - cleanup: Sets the is_active and is_complete and success
     76                                  states on the tasks model. Also uses the
     77                                  drone_manager to:
     78                                     unregister the pidfile
     79                                     copy results of the task
     80                                  (Note this is not to be confused with the
     81                                   special task called cleanup).
     82 
     83                       The actions we take in the epilog are based on the
     84                       success/failure of the autoserv process set in cleanup,
     85                       eg: if reset failed we will enqueue a repair, but if all
     86                       is well the epilog will just return. Prejob task epilogs
     87                       also have an on_pending method that change the status of
     88                       the HQE to pending/starting, which gets picked up in the
     89                       scheduler.
     90 By this point the is_done flag is set, which results in the Agent noticing that
     91 the task has finished and unregistering it from the dispatcher.Class hierarchy:
     92 BaseAgentTask
     93  |--->SpecialAgentTask (prejob_task.py)
     94       |--->RepairTask
     95       |--->PreJobTask
     96            |--->Verify, Cleanup, Reset, Provision
     97 
     98  |--->AbstractQueueTask (monitor_db.py)
     99       |--->QueueTask
    100       |--->HostlessQueueTask
    101 
    102  |--->PostJobTask (postjob_task.py)
    103       |--->GatherLogsTask
    104       |--->SelfThrottledPostJobTask
    105             |--->FinalReparseTask
    106             |--->ArchiveResultsTask
    107 
    108 """
    109 
    110 import logging
    111 import os
    112 import urllib
    113 import time
    114 
    115 from autotest_lib.client.common_lib import global_config
    116 from autotest_lib.client.common_lib import utils
    117 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
    118 from autotest_lib.frontend.afe import models
    119 from autotest_lib.scheduler import drone_manager, pidfile_monitor
    120 from autotest_lib.scheduler import scheduler_lib
    121 from autotest_lib.scheduler import rdb_lib
    122 from autotest_lib.scheduler import scheduler_models
    123 from autotest_lib.server import autoserv_utils
    124 from autotest_lib.server import system_utils
    125 
    126 CONFIG = global_config.global_config
    127 AUTOSERV_NICE_LEVEL = 10
    128 
    129 ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
    130         'CROS', 'enable_drone_in_restricted_subnet', type=bool,
    131         default=False)
    132 
    133 
    134 class BaseAgentTask(object):
    135     class _NullMonitor(object):
    136         pidfile_id = None
    137 
    138         def has_process(self):
    139             return True
    140 
    141 
    142     def __init__(self, log_file_name=None):
    143         """
    144         @param log_file_name: (optional) name of file to log command output to
    145         """
    146         self._drone_manager = drone_manager.instance()
    147         self.done = False
    148         self.started = False
    149         self.success = None
    150         self.aborted = False
    151         self.monitor = None
    152         self.queue_entry_ids = []
    153         self.host_ids = []
    154         # A map between host id and hostname.
    155         self.hostnames = {}
    156         self._log_file_name = log_file_name
    157 
    158 
    159     def _set_ids(self, host=None, queue_entries=None):
    160         if queue_entries and queue_entries != [None]:
    161             self.host_ids = [entry.host.id for entry in queue_entries]
    162             self.queue_entry_ids = [entry.id for entry in queue_entries]
    163             self.hostnames = dict((entry.host.id, entry.host.hostname)
    164                                   for entry in queue_entries)
    165         else:
    166             assert host
    167             self.host_ids = [host.id]
    168             self.hostnames = {host.id: host.hostname}
    169 
    170 
    171     def poll(self):
    172         if not self.started:
    173             self.start()
    174         if not self.done:
    175             self.tick()
    176 
    177 
    178     def tick(self):
    179         assert self.monitor
    180         exit_code = self.monitor.exit_code()
    181         if exit_code is None:
    182             return
    183 
    184         success = (exit_code == 0)
    185         self.finished(success)
    186 
    187 
    188     def is_done(self):
    189         return self.done
    190 
    191 
    192     def finished(self, success):
    193         if self.done:
    194             assert self.started
    195             return
    196         self.started = True
    197         self.done = True
    198         self.success = success
    199         self.epilog()
    200 
    201 
    202     def prolog(self):
    203         """
    204         To be overridden.
    205         """
    206         assert not self.monitor
    207         self.register_necessary_pidfiles()
    208 
    209 
    210     def _log_file(self):
    211         if not self._log_file_name:
    212             return None
    213         return os.path.join(self._working_directory(), self._log_file_name)
    214 
    215 
    216     def cleanup(self):
    217         log_file = self._log_file()
    218         if self.monitor and log_file:
    219             self.monitor.try_copy_to_results_repository(log_file)
    220 
    221 
    222     def epilog(self):
    223         """
    224         To be overridden.
    225         """
    226         self.cleanup()
    227         logging.info("%s finished with success=%s", type(self).__name__,
    228                      self.success)
    229 
    230 
    231     def start(self):
    232         if not self.started:
    233             self.prolog()
    234             self.run()
    235 
    236         self.started = True
    237 
    238 
    239     def abort(self):
    240         if self.monitor:
    241             self.monitor.kill()
    242         self.done = True
    243         self.aborted = True
    244         self.cleanup()
    245 
    246 
    247     def _get_consistent_execution_path(self, execution_entries):
    248         first_execution_path = execution_entries[0].execution_path()
    249         for execution_entry in execution_entries[1:]:
    250             assert execution_entry.execution_path() == first_execution_path, (
    251                 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
    252                                         execution_entry,
    253                                         first_execution_path,
    254                                         execution_entries[0]))
    255         return first_execution_path
    256 
    257 
    258     def _copy_results(self, execution_entries, use_monitor=None):
    259         """
    260         @param execution_entries: list of objects with execution_path() method
    261         """
    262         if use_monitor is not None and not use_monitor.has_process():
    263             return
    264 
    265         assert len(execution_entries) > 0
    266         if use_monitor is None:
    267             assert self.monitor
    268             use_monitor = self.monitor
    269         assert use_monitor.has_process()
    270         execution_path = self._get_consistent_execution_path(execution_entries)
    271         results_path = execution_path + '/'
    272         use_monitor.try_copy_to_results_repository(results_path)
    273 
    274 
    275     def _parse_results(self, queue_entries):
    276         for queue_entry in queue_entries:
    277             queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
    278 
    279 
    280     def _archive_results(self, queue_entries):
    281         for queue_entry in queue_entries:
    282             queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
    283 
    284 
    285     def _command_line(self):
    286         """
    287         Return the command line to run.  Must be overridden.
    288         """
    289         raise NotImplementedError
    290 
    291 
    292     @property
    293     def num_processes(self):
    294         """
    295         Return the number of processes forked by this BaseAgentTask's process.
    296         It may only be approximate.  To be overridden if necessary.
    297         """
    298         return 1
    299 
    300 
    301     def _paired_with_monitor(self):
    302         """
    303         If this BaseAgentTask's process must run on the same machine as some
    304         previous process, this method should be overridden to return a
    305         PidfileRunMonitor for that process.
    306         """
    307         return self._NullMonitor()
    308 
    309 
    310     @property
    311     def owner_username(self):
    312         """
    313         Return login of user responsible for this task.  May be None.  Must be
    314         overridden.
    315         """
    316         raise NotImplementedError
    317 
    318 
    319     def _working_directory(self):
    320         """
    321         Return the directory where this BaseAgentTask's process executes.
    322         Must be overridden.
    323         """
    324         raise NotImplementedError
    325 
    326 
    327     def _pidfile_name(self):
    328         """
    329         Return the name of the pidfile this BaseAgentTask's process uses.  To be
    330         overridden if necessary.
    331         """
    332         return drone_manager.AUTOSERV_PID_FILE
    333 
    334 
    335     def _check_paired_results_exist(self):
    336         if not self._paired_with_monitor().has_process():
    337             metadata = {
    338                     '_type': 'scheduler_error',
    339                     'error': 'No paired results in task',
    340                     'task': str(self),
    341                     'pidfile_id': str(self._paired_with_monitor().pidfile_id)}
    342             autotest_stats.Counter('no_paired_results_in_task',
    343                                    metadata=metadata).increment()
    344             self.finished(False)
    345             return False
    346         return True
    347 
    348 
    349     def _create_monitor(self):
    350         assert not self.monitor
    351         self.monitor = pidfile_monitor.PidfileRunMonitor()
    352 
    353 
    354     def run(self):
    355         if not self._check_paired_results_exist():
    356             return
    357 
    358         self._create_monitor()
    359         self.monitor.run(
    360                 self._command_line(), self._working_directory(),
    361                 num_processes=self.num_processes,
    362                 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
    363                 pidfile_name=self._pidfile_name(),
    364                 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
    365                 username=self.owner_username,
    366                 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
    367 
    368 
    369     def get_drone_hostnames_allowed(
    370             self, restricted_subnets=utils.RESTRICTED_SUBNETS,
    371             enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
    372         filtered_drones = None
    373         has_unrestricted_host = False
    374         if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
    375             for hostname in self.hostnames.values():
    376                 subnet = utils.get_restricted_subnet(hostname,
    377                                                      restricted_subnets)
    378 
    379                 # Return an empty set if the list of hosts exists both in
    380                 # restricted and unrestricted subnet. No drone can work in such
    381                 # case.
    382                 if ((not subnet and filtered_drones is not None) or
    383                     (subnet and has_unrestricted_host)):
    384                     logging.error('The test has some DUT in restricted subnet, '
    385                                   'but some in unrestricted subnet. Therefore, '
    386                                   'no drone is available to run the test.')
    387                     return set()
    388 
    389                 if not subnet:
    390                     has_unrestricted_host = True
    391                     continue
    392 
    393                 server_ip_map=system_utils.DroneCache.get_drone_ip_map()
    394                 filtered_drones_for_host = set(
    395                         utils.get_servers_in_same_subnet(
    396                                 subnet[0], subnet[1],
    397                                 server_ip_map=server_ip_map))
    398                 logging.info('DUT %s is in restricted subnet, drone can only '
    399                              'be chosen from %s', hostname,
    400                              filtered_drones_for_host)
    401                 if filtered_drones is None:
    402                     filtered_drones = filtered_drones_for_host
    403                 else:
    404                     filtered_drones = set.intersection(
    405                             filtered_drones, filtered_drones_for_host)
    406 
    407                 # If filtered_drones is an empty set, that means no drone is
    408                 # allowed to run the task. This is different fron None, which
    409                 # means all drones are allowed.
    410                 if filtered_drones == set():
    411                     logging.error('DUT(s) is in restricted subnet, but no '
    412                                   'drone is available to run the test.')
    413                     return filtered_drones
    414 
    415         # If host is not in restricted subnet, use the unrestricted drones only.
    416         if (filtered_drones is None and restricted_subnets and
    417             enable_drone_in_subnet):
    418             filtered_drones = set(
    419                     system_utils.DroneCache.get_unrestricted_drones(
    420                             restricted_subnets=restricted_subnets))
    421 
    422         if not models.DroneSet.drone_sets_enabled():
    423             return filtered_drones
    424 
    425         hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
    426         if not hqes:
    427             # Only special tasks could be missing host queue entries
    428             assert isinstance(self, SpecialAgentTask)
    429             return self._user_or_global_default_drone_set(
    430                     self.task, self.task.requested_by)
    431 
    432         job_ids = hqes.values_list('job', flat=True).distinct()
    433         assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
    434                                       "span multiple jobs")
    435 
    436         job = models.Job.objects.get(id=job_ids[0])
    437         drone_set = job.drone_set
    438         if not drone_set:
    439             return self._user_or_global_default_drone_set(job, job.user())
    440 
    441         if filtered_drones:
    442             return set.intersection(filtered_drones,
    443                                     drone_set.get_drone_hostnames())
    444         else:
    445             return drone_set.get_drone_hostnames()
    446 
    447 
    448     def _user_or_global_default_drone_set(self, obj_with_owner, user):
    449         """
    450         Returns the user's default drone set, if present.
    451 
    452         Otherwise, returns the global default drone set.
    453         """
    454         default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
    455         if not user:
    456             logging.warning('%s had no owner; using default drone set',
    457                          obj_with_owner)
    458             return default_hostnames
    459         if not user.drone_set:
    460             logging.warning('User %s has no default drone set, using global '
    461                          'default', user.login)
    462             return default_hostnames
    463         return user.drone_set.get_drone_hostnames()
    464 
    465 
    466     def register_necessary_pidfiles(self):
    467         pidfile_id = self._drone_manager.get_pidfile_id_from(
    468                 self._working_directory(), self._pidfile_name())
    469         self._drone_manager.register_pidfile(pidfile_id)
    470 
    471         paired_pidfile_id = self._paired_with_monitor().pidfile_id
    472         if paired_pidfile_id:
    473             self._drone_manager.register_pidfile(paired_pidfile_id)
    474 
    475 
    476     def recover(self):
    477         if not self._check_paired_results_exist():
    478             return
    479 
    480         self._create_monitor()
    481         self.monitor.attach_to_existing_process(
    482                 self._working_directory(), pidfile_name=self._pidfile_name(),
    483                 num_processes=self.num_processes)
    484         if not self.monitor.has_process():
    485             # no process to recover; wait to be started normally
    486             self.monitor = None
    487             return
    488 
    489         self.started = True
    490         logging.info('Recovering process %s for %s at %s',
    491                      self.monitor.get_process(), type(self).__name__,
    492                      self._working_directory())
    493 
    494 
    495     def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
    496                                     allowed_host_statuses=None):
    497         class_name = self.__class__.__name__
    498         for entry in queue_entries:
    499             if entry.status not in allowed_hqe_statuses:
    500                 raise scheduler_lib.SchedulerError(
    501                         '%s attempting to start entry with invalid status %s: '
    502                         '%s' % (class_name, entry.status, entry))
    503             invalid_host_status = (
    504                     allowed_host_statuses is not None
    505                     and entry.host.status not in allowed_host_statuses)
    506             if invalid_host_status:
    507                 raise scheduler_lib.SchedulerError(
    508                         '%s attempting to start on queue entry with invalid '
    509                         'host status %s: %s'
    510                         % (class_name, entry.host.status, entry))
    511 
    512 
    513 SiteAgentTask = utils.import_site_class(
    514     __file__, 'autotest_lib.scheduler.site_monitor_db',
    515     'SiteAgentTask', BaseAgentTask)
    516 
    517 class AgentTask(SiteAgentTask):
    518     pass
    519 
    520 
    521 class TaskWithJobKeyvals(object):
    522     """AgentTask mixin providing functionality to help with job keyval files."""
    523     _KEYVAL_FILE = 'keyval'
    524     def _format_keyval(self, key, value):
    525         return '%s=%s' % (key, value)
    526 
    527 
    528     def _keyval_path(self):
    529         """Subclasses must override this"""
    530         raise NotImplementedError
    531 
    532 
    533     def _write_keyval_after_job(self, field, value):
    534         assert self.monitor
    535         if not self.monitor.has_process():
    536             return
    537         self._drone_manager.write_lines_to_file(
    538             self._keyval_path(), [self._format_keyval(field, value)],
    539             paired_with_process=self.monitor.get_process())
    540 
    541 
    542     def _job_queued_keyval(self, job):
    543         return 'job_queued', int(time.mktime(job.created_on.timetuple()))
    544 
    545 
    546     def _write_job_finished(self):
    547         self._write_keyval_after_job("job_finished", int(time.time()))
    548 
    549 
    550     def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
    551         keyval_contents = '\n'.join(self._format_keyval(key, value)
    552                                     for key, value in keyval_dict.iteritems())
    553         # always end with a newline to allow additional keyvals to be written
    554         keyval_contents += '\n'
    555         self._drone_manager.attach_file_to_execution(self._working_directory(),
    556                                                 keyval_contents,
    557                                                 file_path=keyval_path)
    558 
    559 
    560     def _write_keyvals_before_job(self, keyval_dict):
    561         self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
    562 
    563 
    564     def _write_host_keyvals(self, host):
    565         keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
    566                                    host.hostname)
    567         platform, all_labels = host.platform_and_labels()
    568         all_labels = [ urllib.quote(label) for label in all_labels ]
    569         keyval_dict = dict(platform=platform, labels=','.join(all_labels))
    570         self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
    571 
    572 
    573 class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
    574     """
    575     Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
    576     """
    577 
    578     TASK_TYPE = None
    579     host = None
    580     queue_entry = None
    581 
    582     def __init__(self, task, extra_command_args):
    583         super(SpecialAgentTask, self).__init__()
    584 
    585         assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
    586 
    587         self.host = rdb_lib.get_hosts([task.host.id])[0]
    588         self.host.dbg_str = 'Task: %s' % str(task)
    589         self.queue_entry = None
    590         if task.queue_entry:
    591             self.queue_entry = scheduler_models.HostQueueEntry(
    592                     id=task.queue_entry.id)
    593             self.host.dbg_str += self.queue_entry.get_dbg_str()
    594 
    595         self.task = task
    596         self._extra_command_args = extra_command_args
    597         self.host.metadata = self.get_metadata()
    598 
    599 
    600     def get_metadata(self):
    601         """Get a dictionary that contains task information.
    602 
    603         The return value is a dictionary that includes task information like id,
    604         name and related job information. The value will be stored in metadata
    605         database.
    606         @return: A dictionary containing the task id, name and related job id.
    607                  If some attributes are failed to be accessed, an empty
    608                  dictionary will be returned, and error will be logged.
    609         """
    610         try:
    611             metadata = {'task_id':self.task.id, 'task_name':self.task.task,
    612                         'hostname':self.task.host.hostname}
    613             if self.task.queue_entry:
    614                 job = self.task.queue_entry.job
    615                 metadata.update(
    616                         scheduler_models.get_job_metadata(job))
    617             return metadata
    618         except AttributeError as e:
    619             logging.error('Task has missing attribute: %s', e)
    620             return {}
    621 
    622 
    623     def _keyval_path(self):
    624         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
    625 
    626 
    627     def _command_line(self):
    628         return autoserv_utils._autoserv_command_line(self.host.hostname,
    629                                                      self._extra_command_args,
    630                                                      queue_entry=self.queue_entry,
    631                                                      in_lab=True)
    632 
    633 
    634     def _working_directory(self):
    635         return self.task.execution_path()
    636 
    637 
    638     @property
    639     def owner_username(self):
    640         if self.task.requested_by:
    641             return self.task.requested_by.login
    642         return None
    643 
    644 
    645     def prolog(self):
    646         super(SpecialAgentTask, self).prolog()
    647         self.task.activate()
    648         self._write_host_keyvals(self.host)
    649 
    650 
    651     def _fail_queue_entry(self):
    652         assert self.queue_entry
    653 
    654         if self.queue_entry.meta_host:
    655             return # don't fail metahost entries, they'll be reassigned
    656 
    657         self.queue_entry.update_from_database()
    658         if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
    659             return # entry has been aborted
    660 
    661         self._actually_fail_queue_entry()
    662 
    663 
    664     # TODO(milleral): http://crbug.com/268607
    665     # All this used to be a part of _fail_queue_entry.  The
    666     # exact semantics of when one should and should not be failing a queue
    667     # entry need to be worked out, because provisioning has placed us in a
    668     # case where we want to fail a queue entry that could be requeued,
    669     # which makes us fail the two above if statements, and thus
    670     # _fail_queue_entry() would exit early and have no effect.
    671     # What's left here with _actually_fail_queue_entry is a hack to be able to
    672     # bypass the checks and unconditionally execute the code.
    673     def _actually_fail_queue_entry(self):
    674         self.queue_entry.set_execution_subdir()
    675         queued_key, queued_time = self._job_queued_keyval(
    676             self.queue_entry.job)
    677         self._write_keyval_after_job(queued_key, queued_time)
    678         self._write_job_finished()
    679 
    680         # copy results logs into the normal place for job results
    681         self.monitor.try_copy_results_on_drone(
    682                 source_path=self._working_directory() + '/',
    683                 destination_path=self.queue_entry.execution_path() + '/')
    684 
    685         pidfile_id = self._drone_manager.get_pidfile_id_from(
    686                 self.queue_entry.execution_path(),
    687                 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
    688         self._drone_manager.register_pidfile(pidfile_id)
    689 
    690         if self.queue_entry.job.parse_failed_repair:
    691             self._parse_results([self.queue_entry])
    692         else:
    693             self._archive_results([self.queue_entry])
    694 
    695         # Also fail all other special tasks that have not yet run for this HQE
    696         pending_tasks = models.SpecialTask.objects.filter(
    697                 queue_entry__id=self.queue_entry.id,
    698                 is_complete=0)
    699         for task in pending_tasks:
    700             task.finish(False)
    701 
    702 
    703     def cleanup(self):
    704         super(SpecialAgentTask, self).cleanup()
    705 
    706         # We will consider an aborted task to be "Failed"
    707         self.task.finish(bool(self.success))
    708 
    709         if self.monitor:
    710             if self.monitor.has_process():
    711                 self._copy_results([self.task])
    712             if self.monitor.pidfile_id is not None:
    713                 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
    714 
    715 
    716     def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
    717         """Remove a type of special task in all tasks, keep last one if needed.
    718 
    719         @param special_task_to_remove: type of special task to be removed, e.g.,
    720             models.SpecialTask.Task.VERIFY.
    721         @param keep_last_one: True to keep the last special task if its type is
    722             the same as of special_task_to_remove.
    723 
    724         """
    725         queued_special_tasks = models.SpecialTask.objects.filter(
    726             host__id=self.host.id,
    727             task=special_task_to_remove,
    728             is_active=False, is_complete=False, queue_entry=None)
    729         if keep_last_one:
    730             queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
    731         queued_special_tasks.delete()
    732 
    733 
    734     def _generate_autoserv_label_args(self, task):
    735         """
    736         @param task: An instance of afe model's SpecialTask.
    737         @returns: The list of arguments to pass to autoserv to tell it what the
    738                   labels of a job are.
    739 
    740         """
    741         labels = {x.name for x in task.queue_entry.job.labels}
    742         return ['--job-labels', ','.join(labels)]
    743