Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable=missing-docstring
      2 
      3 """ This is the module for everything related to the AgentTask.
      4 
      5 The AgentTask imposes an interface through which the scheduler can monitor
      6 a processes; Examples of such processes include Verify, Cleanup and the Queue
      7 Tasks that run the tests. The scheduler itself only understands Agents.
      8 Agents:
      9     The Agent is the bridge between the scheduler and the AgentTask. The
     10     schedulers tick has a method called handle_agents, which calls the
     11     tick of each agent in the Dispatchers queue. This leads to the Agent
     12     polling its AgentTask. The scheduler will keep polling a task through
     13     the associated Agent till the Agent is removed from the dispatcher.
     14 
     15     At a high level:
     16         agents finished = tasks done
     17         agent polls till finished
     18             task polls till done
     19                 task sets done
     20         agent is removed from dispatcher
     21 AgentTasks:
     22     Basic AgentTasks are created when an hqe changes state. Examples of these
     23     are the QueueTask, which is created when a hqe goes into the Starting state
     24     and the FinalReparseTask, which is created when the hqe goes into parsing.
     25 SpecialAgentTasks:
     26     Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
     27     in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
     28 
     29 Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
     30 an AgentTask to an Agent, which the scheduler understands. From this point
     31 onward, the scheduler manages the task through the Agents interface,as follows:
     32 At a high level:
     33     task poll
     34         start
     35             prolog
     36         tick till we get an exit code
     37         finished(exit==0)
     38             done=True
     39             epilog
     40                 cleanup
     41                     set is_active, is_complete, success (checked in scheduler)
     42 
     43 The first special task for an HQE is usually Reset.
     44 -poll: The first poll will start the task, polls thereafter will call the tasks
     45        tick method. A started task will have the started bit set.
     46 - start: Call prolog, run the process and set the start bit.
     47     - prolog: Usually where one puts any model state changes that happen before
     48               the actual task. Different per Task. Examples of things that might
     49               happen in a prolog:
     50                   - state of Host, HQE (to something like Resetting)
     51                   - delete any unwanted queued special tasks
     52                   - register a pidfile
     53                   - set the is_active bit on the special task
     54     - run:
     55         - create a PidfileRunMonitor
     56         - pass the autoserv command, working directory etc to drone manager.
     57           This will start the actual autoserv process.
     58    - set the start bit: so subsequent polls do not 'start' again
     59 
     60 - tick: For as long as a started tasks done bit is not set, a poll will lead
     61         to a tick. The tick monitors the pid file of the autoserv process
     62         running on the drone through the PidfileRunMonitor created in prolog.
     63         If the autoserv process has finished we call finished with true/false
     64         depending on autoserv exit code.
     65 
     66         - finished: sets the done and success values, then calls epilog. The
     67                     done bit is important because the Agent polls this bit to
     68                     measure the success or failure of its task.
     69 
     70             - epilog: Is generally where we set status of the Host/HQE again,
     71                       requeue any other task that needs to run after this one
     72                       and perform cleanup. Just like the prolog, this step is
     73                       different per task.
     74 
     75                       - cleanup: Sets the is_active and is_complete and success
     76                                  states on the tasks model. Also uses the
     77                                  drone_manager to:
     78                                     unregister the pidfile
     79                                     copy results of the task
     80                                  (Note this is not to be confused with the
     81                                   special task called cleanup).
     82 
     83                       The actions we take in the epilog are based on the
     84                       success/failure of the autoserv process set in cleanup,
     85                       eg: if reset failed we will enqueue a repair, but if all
     86                       is well the epilog will just return. Prejob task epilogs
     87                       also have an on_pending method that change the status of
     88                       the HQE to pending/starting, which gets picked up in the
     89                       scheduler.
     90 By this point the is_done flag is set, which results in the Agent noticing that
     91 the task has finished and unregistering it from the dispatcher.Class hierarchy:
     92 AgentTask
     93  |--->SpecialAgentTask (prejob_task.py)
     94       |--->RepairTask
     95       |--->PreJobTask
     96            |--->Verify, Cleanup, Reset, Provision
     97 
     98  |--->AbstractQueueTask (monitor_db.py)
     99       |--->QueueTask
    100       |--->HostlessQueueTask
    101 
    102  |--->PostJobTask (postjob_task.py)
    103       |--->GatherLogsTask
    104       |--->SelfThrottledPostJobTask
    105             |--->FinalReparseTask
    106 
    107 """
    108 
    109 import logging
    110 import os
    111 import time
    112 import urllib
    113 
    114 import common
    115 
    116 from autotest_lib.client.common_lib import global_config
    117 from autotest_lib.client.common_lib import utils
    118 from autotest_lib.frontend.afe import models
    119 from autotest_lib.scheduler import drone_manager
    120 from autotest_lib.scheduler import email_manager
    121 from autotest_lib.scheduler import pidfile_monitor
    122 from autotest_lib.scheduler import rdb_lib
    123 from autotest_lib.scheduler import scheduler_lib
    124 from autotest_lib.scheduler import scheduler_models
    125 from autotest_lib.server import autoserv_utils
    126 from autotest_lib.server import system_utils
    127 
    128 try:
    129     from chromite.lib import metrics
    130 except ImportError:
    131     metrics = utils.metrics_mock
    132 
    133 
    134 CONFIG = global_config.global_config
    135 AUTOSERV_NICE_LEVEL = 10
    136 
    137 ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
    138         'CROS', 'enable_drone_in_restricted_subnet', type=bool,
    139         default=False)
    140 
    141 
    142 class AgentTask(object):
    143     class _NullMonitor(object):
    144         pidfile_id = None
    145 
    146         def has_process(self):
    147             return True
    148 
    149 
    150     def __init__(self, log_file_name=None):
    151         """
    152         @param log_file_name: (optional) name of file to log command output to
    153         """
    154         self._drone_manager = drone_manager.instance()
    155         self.done = False
    156         self.started = False
    157         self.success = None
    158         self.aborted = False
    159         self.monitor = None
    160         self.queue_entry_ids = []
    161         self.host_ids = []
    162         # A map between host id and hostname.
    163         self.hostnames = {}
    164         self._log_file_name = log_file_name
    165 
    166 
    167     def _set_ids(self, host=None, queue_entries=None):
    168         if queue_entries and queue_entries != [None]:
    169             self.host_ids = []
    170             self.queue_entry_ids = []
    171             self.hostnames = {}
    172             for entry in queue_entries:
    173                 if entry.host is not None:
    174                     self.host_ids.append(entry.host.id)
    175                     self.queue_entry_ids.append(entry.id)
    176                     self.hostnames[entry.host.id] = entry.host.hostname
    177                 else:
    178                     logging.debug(
    179                             'No host is found for host_queue_entry_id: %r',
    180                             entry.id)
    181                     raise scheduler_lib.NoHostIdError(
    182                             'Failed to schedule a job whose '
    183                             'host_queue_entry_id=%r due to no host_id.'
    184                             % entry.id)
    185         else:
    186             assert host
    187             self.host_ids = [host.id]
    188             self.hostnames = {host.id: host.hostname}
    189 
    190 
    191     def poll(self):
    192         if not self.started:
    193             self.start()
    194         if not self.done:
    195             self.tick()
    196 
    197 
    198     def tick(self):
    199         assert self.monitor
    200         exit_code = self.monitor.exit_code()
    201         if exit_code is None:
    202             return
    203 
    204         success = (exit_code == 0)
    205         self.finished(success)
    206 
    207 
    208     def is_done(self):
    209         return self.done
    210 
    211 
    212     def finished(self, success):
    213         if self.done:
    214             assert self.started
    215             return
    216         self.started = True
    217         self.done = True
    218         self.success = success
    219         self.epilog()
    220 
    221 
    222     def prolog(self):
    223         """
    224         To be overridden.
    225         """
    226         assert not self.monitor
    227         self.register_necessary_pidfiles()
    228 
    229 
    230     def _log_file(self):
    231         if not self._log_file_name:
    232             return None
    233         return os.path.join(self._working_directory(), self._log_file_name)
    234 
    235 
    236     def cleanup(self):
    237         log_file = self._log_file()
    238         if self.monitor and log_file:
    239             self.monitor.try_copy_to_results_repository(log_file)
    240 
    241 
    242     def epilog(self):
    243         """
    244         To be overridden.
    245         """
    246         self.cleanup()
    247         logging.info("%s finished with success=%s", type(self).__name__,
    248                      self.success)
    249 
    250 
    251     def start(self):
    252         if not self.started:
    253             self.prolog()
    254             self.run()
    255 
    256         self.started = True
    257 
    258 
    259     def abort(self):
    260         if self.monitor:
    261             self.monitor.kill()
    262         self.done = True
    263         self.aborted = True
    264         self.cleanup()
    265 
    266 
    267     def _get_consistent_execution_path(self, execution_entries):
    268         first_execution_path = execution_entries[0].execution_path()
    269         for execution_entry in execution_entries[1:]:
    270             assert execution_entry.execution_path() == first_execution_path, (
    271                 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
    272                                         execution_entry,
    273                                         first_execution_path,
    274                                         execution_entries[0]))
    275         return first_execution_path
    276 
    277 
    278     def _copy_results(self, execution_entries, use_monitor=None):
    279         """
    280         @param execution_entries: list of objects with execution_path() method
    281         """
    282         if use_monitor is not None and not use_monitor.has_process():
    283             return
    284 
    285         assert len(execution_entries) > 0
    286         if use_monitor is None:
    287             assert self.monitor
    288             use_monitor = self.monitor
    289         assert use_monitor.has_process()
    290         execution_path = self._get_consistent_execution_path(execution_entries)
    291         results_path = execution_path + '/'
    292         use_monitor.try_copy_to_results_repository(results_path)
    293 
    294 
    295     def _parse_results(self, queue_entries):
    296         for queue_entry in queue_entries:
    297             queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
    298 
    299 
    300     def _command_line(self):
    301         """
    302         Return the command line to run.  Must be overridden.
    303         """
    304         raise NotImplementedError
    305 
    306 
    307     @property
    308     def num_processes(self):
    309         """
    310         Return the number of processes forked by this AgentTask's process.
    311         It may only be approximate.  To be overridden if necessary.
    312         """
    313         return 1
    314 
    315 
    316     def _paired_with_monitor(self):
    317         """
    318         If this AgentTask's process must run on the same machine as some
    319         previous process, this method should be overridden to return a
    320         PidfileRunMonitor for that process.
    321         """
    322         return self._NullMonitor()
    323 
    324 
    325     @property
    326     def owner_username(self):
    327         """
    328         Return login of user responsible for this task.  May be None.  Must be
    329         overridden.
    330         """
    331         raise NotImplementedError
    332 
    333 
    334     def _working_directory(self):
    335         """
    336         Return the directory where this AgentTask's process executes.
    337         Must be overridden.
    338         """
    339         raise NotImplementedError
    340 
    341 
    342     def _pidfile_name(self):
    343         """
    344         Return the name of the pidfile this AgentTask's process uses.  To be
    345         overridden if necessary.
    346         """
    347         return drone_manager.AUTOSERV_PID_FILE
    348 
    349 
    350     def _check_paired_results_exist(self):
    351         if not self._paired_with_monitor().has_process():
    352             metrics.Counter(
    353                 'chromeos/autotest/errors/scheduler/no_paired_results'
    354             ).increment()
    355             self.finished(False)
    356             return False
    357         return True
    358 
    359 
    360     def _create_monitor(self):
    361         assert not self.monitor
    362         self.monitor = pidfile_monitor.PidfileRunMonitor()
    363 
    364 
    365     def run(self):
    366         if not self._check_paired_results_exist():
    367             return
    368 
    369         self._create_monitor()
    370         self.monitor.run(
    371                 self._command_line(), self._working_directory(),
    372                 num_processes=self.num_processes,
    373                 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
    374                 pidfile_name=self._pidfile_name(),
    375                 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
    376                 username=self.owner_username,
    377                 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
    378 
    379 
    380     def get_drone_hostnames_allowed(
    381             self, restricted_subnets=utils.RESTRICTED_SUBNETS,
    382             enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
    383         filtered_drones = None
    384         has_unrestricted_host = False
    385         if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
    386             for hostname in self.hostnames.values():
    387                 subnet = utils.get_restricted_subnet(hostname,
    388                                                      restricted_subnets)
    389 
    390                 # Return an empty set if the list of hosts exists both in
    391                 # restricted and unrestricted subnet. No drone can work in such
    392                 # case.
    393                 if ((not subnet and filtered_drones is not None) or
    394                     (subnet and has_unrestricted_host)):
    395                     logging.error('The test has some DUT in restricted subnet, '
    396                                   'but some in unrestricted subnet. Therefore, '
    397                                   'no drone is available to run the test.')
    398                     return set()
    399 
    400                 if not subnet:
    401                     has_unrestricted_host = True
    402                     continue
    403 
    404                 server_ip_map=system_utils.DroneCache.get_drone_ip_map()
    405                 filtered_drones_for_host = set(
    406                         utils.get_servers_in_same_subnet(
    407                                 subnet[0], subnet[1],
    408                                 server_ip_map=server_ip_map))
    409                 logging.info('DUT %s is in restricted subnet, drone can only '
    410                              'be chosen from %s', hostname,
    411                              filtered_drones_for_host)
    412                 if filtered_drones is None:
    413                     filtered_drones = filtered_drones_for_host
    414                 else:
    415                     filtered_drones = set.intersection(
    416                             filtered_drones, filtered_drones_for_host)
    417 
    418                 # If filtered_drones is an empty set, that means no drone is
    419                 # allowed to run the task. This is different fron None, which
    420                 # means all drones are allowed.
    421                 if filtered_drones == set():
    422                     logging.error('DUT(s) is in restricted subnet, but no '
    423                                   'drone is available to run the test.')
    424                     return filtered_drones
    425 
    426         # If host is not in restricted subnet, use the unrestricted drones only.
    427         if (filtered_drones is None and restricted_subnets and
    428             enable_drone_in_subnet):
    429             filtered_drones = set(
    430                     system_utils.DroneCache.get_unrestricted_drones(
    431                             restricted_subnets=restricted_subnets))
    432 
    433         if not models.DroneSet.drone_sets_enabled():
    434             return filtered_drones
    435 
    436         hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
    437         if not hqes:
    438             # Only special tasks could be missing host queue entries
    439             assert isinstance(self, SpecialAgentTask)
    440             return self._user_or_global_default_drone_set(
    441                     self.task, self.task.requested_by)
    442 
    443         job_ids = hqes.values_list('job', flat=True).distinct()
    444         assert job_ids.count() == 1, ("AgentTask's queue entries "
    445                                       "span multiple jobs")
    446 
    447         job = models.Job.objects.get(id=job_ids[0])
    448         drone_set = job.drone_set
    449         if not drone_set:
    450             return self._user_or_global_default_drone_set(job, job.user())
    451 
    452         if filtered_drones:
    453             return set.intersection(filtered_drones,
    454                                     drone_set.get_drone_hostnames())
    455         else:
    456             return drone_set.get_drone_hostnames()
    457 
    458 
    459     def _user_or_global_default_drone_set(self, obj_with_owner, user):
    460         """
    461         Returns the user's default drone set, if present.
    462 
    463         Otherwise, returns the global default drone set.
    464         """
    465         default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
    466         if not user:
    467             logging.warning('%s had no owner; using default drone set',
    468                          obj_with_owner)
    469             return default_hostnames
    470         if not user.drone_set:
    471             logging.warning('User %s has no default drone set, using global '
    472                          'default', user.login)
    473             return default_hostnames
    474         return user.drone_set.get_drone_hostnames()
    475 
    476 
    477     def register_necessary_pidfiles(self):
    478         pidfile_id = self._drone_manager.get_pidfile_id_from(
    479                 self._working_directory(), self._pidfile_name())
    480         self._drone_manager.register_pidfile(pidfile_id)
    481 
    482         paired_pidfile_id = self._paired_with_monitor().pidfile_id
    483         if paired_pidfile_id:
    484             self._drone_manager.register_pidfile(paired_pidfile_id)
    485 
    486 
    487     def recover(self):
    488         if not self._check_paired_results_exist():
    489             return
    490 
    491         self._create_monitor()
    492         self.monitor.attach_to_existing_process(
    493                 self._working_directory(), pidfile_name=self._pidfile_name(),
    494                 num_processes=self.num_processes)
    495         if not self.monitor.has_process():
    496             # no process to recover; wait to be started normally
    497             self.monitor = None
    498             return
    499 
    500         self.started = True
    501         logging.info('Recovering process %s for %s at %s',
    502                      self.monitor.get_process(), type(self).__name__,
    503                      self._working_directory())
    504 
    505 
    506     def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
    507                                     allowed_host_statuses=None):
    508         class_name = self.__class__.__name__
    509         for entry in queue_entries:
    510             if entry.status not in allowed_hqe_statuses:
    511                 # In the orignal code, here we raise an exception. In an
    512                 # effort to prevent downtime we will instead abort the job and
    513                 # send out an email notifying us this has occured.
    514                 error_message = ('%s attempting to start entry with invalid '
    515                                  'status %s: %s. Aborting Job: %s.'
    516                                  % (class_name, entry.status, entry,
    517                                     entry.job))
    518                 logging.error(error_message)
    519                 email_manager.manager.enqueue_notify_email(
    520                     'Job Aborted - Invalid Host Queue Entry Status',
    521                     error_message)
    522                 entry.job.request_abort()
    523             invalid_host_status = (
    524                     allowed_host_statuses is not None
    525                     and entry.host.status not in allowed_host_statuses)
    526             if invalid_host_status:
    527                 # In the orignal code, here we raise an exception. In an
    528                 # effort to prevent downtime we will instead abort the job and
    529                 # send out an email notifying us this has occured.
    530                 error_message = ('%s attempting to start on queue entry with '
    531                                  'invalid host status %s: %s. Aborting Job: %s'
    532                                  % (class_name, entry.host.status, entry,
    533                                     entry.job))
    534                 logging.error(error_message)
    535                 email_manager.manager.enqueue_notify_email(
    536                     'Job Aborted - Invalid Host Status', error_message)
    537                 entry.job.request_abort()
    538 
    539 
    540 class TaskWithJobKeyvals(object):
    541     """AgentTask mixin providing functionality to help with job keyval files."""
    542     _KEYVAL_FILE = 'keyval'
    543     def _format_keyval(self, key, value):
    544         return '%s=%s' % (key, value)
    545 
    546 
    547     def _keyval_path(self):
    548         """Subclasses must override this"""
    549         raise NotImplementedError
    550 
    551 
    552     def _write_keyval_after_job(self, field, value):
    553         assert self.monitor
    554         if not self.monitor.has_process():
    555             return
    556         self._drone_manager.write_lines_to_file(
    557             self._keyval_path(), [self._format_keyval(field, value)],
    558             paired_with_process=self.monitor.get_process())
    559 
    560 
    561     def _job_queued_keyval(self, job):
    562         return 'job_queued', int(time.mktime(job.created_on.timetuple()))
    563 
    564 
    565     def _write_job_finished(self):
    566         self._write_keyval_after_job("job_finished", int(time.time()))
    567 
    568 
    569     def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
    570         keyval_contents = '\n'.join(self._format_keyval(key, value)
    571                                     for key, value in keyval_dict.iteritems())
    572         # always end with a newline to allow additional keyvals to be written
    573         keyval_contents += '\n'
    574         self._drone_manager.attach_file_to_execution(self._working_directory(),
    575                                                 keyval_contents,
    576                                                 file_path=keyval_path)
    577 
    578 
    579     def _write_keyvals_before_job(self, keyval_dict):
    580         self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
    581 
    582 
    583     def _write_host_keyvals(self, host):
    584         keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
    585                                    host.hostname)
    586         platform, all_labels = host.platform_and_labels()
    587         all_labels = [ urllib.quote(label) for label in all_labels ]
    588         keyval_dict = dict(platform=platform, labels=','.join(all_labels))
    589         self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
    590 
    591 
    592 class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
    593     """
    594     Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
    595     """
    596 
    597     TASK_TYPE = None
    598     host = None
    599     queue_entry = None
    600     _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count'
    601     _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut'
    602     _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations'
    603 
    604 
    605     def __init__(self, task, extra_command_args):
    606         super(SpecialAgentTask, self).__init__()
    607 
    608         assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
    609 
    610         self.host = rdb_lib.get_hosts([task.host.id])[0]
    611         self.host.dbg_str = 'Task: %s' % str(task)
    612         self.queue_entry = None
    613         if task.queue_entry:
    614             self.queue_entry = scheduler_models.HostQueueEntry(
    615                     id=task.queue_entry.id)
    616             self.host.dbg_str += self.queue_entry.get_dbg_str()
    617 
    618         # This is of type SpecialTask (as defined in frontend/afe/models.py)
    619         self.task = task
    620         self._extra_command_args = extra_command_args
    621         self.host.metadata = self.get_metadata()
    622         self._milestone = ''
    623 
    624 
    625     def get_metadata(self):
    626         """Get a dictionary that contains task information.
    627 
    628         The return value is a dictionary that includes task information like id,
    629         name and related job information. The value will be stored in metadata
    630         database.
    631         @return: A dictionary containing the task id, name and related job id.
    632                  If some attributes are failed to be accessed, an empty
    633                  dictionary will be returned, and error will be logged.
    634         """
    635         try:
    636             metadata = {'task_id':self.task.id, 'task_name':self.task.task,
    637                         'hostname':self.task.host.hostname}
    638             if self.task.queue_entry:
    639                 job = self.task.queue_entry.job
    640                 metadata.update(
    641                         scheduler_models.get_job_metadata(job))
    642             return metadata
    643         except AttributeError as e:
    644             logging.error('Task has missing attribute: %s', e)
    645             return {}
    646 
    647 
    648     def _keyval_path(self):
    649         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
    650 
    651 
    652     def _command_line(self):
    653         return autoserv_utils._autoserv_command_line(self.host.hostname,
    654                                                      self._extra_command_args,
    655                                                      queue_entry=self.queue_entry,
    656                                                      in_lab=True)
    657 
    658 
    659     def _working_directory(self):
    660         return self.task.execution_path()
    661 
    662 
    663     @property
    664     def owner_username(self):
    665         if self.task.requested_by:
    666             return self.task.requested_by.login
    667         return None
    668 
    669 
    670     def prolog(self):
    671         super(SpecialAgentTask, self).prolog()
    672         self.task.activate()
    673         self._write_host_keyvals(self.host)
    674 
    675 
    676     def _fail_queue_entry(self):
    677         assert self.queue_entry
    678 
    679         if self.queue_entry.meta_host:
    680             return # don't fail metahost entries, they'll be reassigned
    681 
    682         self.queue_entry.update_from_database()
    683         if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
    684             return # entry has been aborted
    685 
    686         self._actually_fail_queue_entry()
    687 
    688 
    689     def epilog(self):
    690         super(SpecialAgentTask, self).epilog()
    691         self._emit_special_task_status_metric()
    692 
    693 
    694     def _emit_special_task_status_metric(self):
    695         """Increments an accumulator associated with this special task."""
    696         fields = {'type': self.TASK_TYPE,
    697                   'success': bool(self.success),
    698                   'board': str(self.host.board),
    699                   'milestone': self._milestone}
    700         metrics.Counter(self._COUNT_METRIC).increment(
    701             fields=fields)
    702 
    703         if (self.task.time_finished and self.task.time_started):
    704             duration = (self.task.time_finished -
    705                         self.task.time_started).total_seconds()
    706             metrics.SecondsDistribution(self._DURATION_METRIC).add(
    707                 duration, fields=fields)
    708 
    709         dut_fields = {
    710             'type': self.TASK_TYPE,
    711             'success': bool(self.success),
    712             'board': str(self.host.board),
    713             'dut_host_name': self.host.hostname
    714         }
    715         metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields)
    716 
    717     # TODO(milleral): http://crbug.com/268607
    718     # All this used to be a part of _fail_queue_entry.  The
    719     # exact semantics of when one should and should not be failing a queue
    720     # entry need to be worked out, because provisioning has placed us in a
    721     # case where we want to fail a queue entry that could be requeued,
    722     # which makes us fail the two above if statements, and thus
    723     # _fail_queue_entry() would exit early and have no effect.
    724     # What's left here with _actually_fail_queue_entry is a hack to be able to
    725     # bypass the checks and unconditionally execute the code.
    726     def _actually_fail_queue_entry(self):
    727         self.queue_entry.set_execution_subdir()
    728         queued_key, queued_time = self._job_queued_keyval(
    729             self.queue_entry.job)
    730         self._write_keyval_after_job(queued_key, queued_time)
    731         self._write_job_finished()
    732 
    733         # copy results logs into the normal place for job results
    734         self.monitor.try_copy_results_on_drone(
    735                 source_path=self._working_directory() + '/',
    736                 destination_path=self.queue_entry.execution_path() + '/')
    737 
    738         pidfile_id = self._drone_manager.get_pidfile_id_from(
    739                 self.queue_entry.execution_path(),
    740                 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
    741         self._drone_manager.register_pidfile(pidfile_id)
    742 
    743         # TODO(ayatane): This should obey self.queue_entry.job.parse_failed_repair
    744         # But nothing sets self.queue_entry.job.parse_failed_repair?
    745         # Check Git blame
    746         self._parse_results([self.queue_entry])
    747 
    748         # Also fail all other special tasks that have not yet run for this HQE
    749         pending_tasks = models.SpecialTask.objects.filter(
    750                 queue_entry__id=self.queue_entry.id,
    751                 is_complete=0)
    752         for task in pending_tasks:
    753             task.finish(False)
    754 
    755 
    756     def cleanup(self):
    757         super(SpecialAgentTask, self).cleanup()
    758 
    759         # We will consider an aborted task to be "Failed"
    760         self.task.finish(bool(self.success))
    761 
    762         if self.monitor:
    763             if self.monitor.has_process():
    764                 self._copy_results([self.task])
    765             if self.monitor.pidfile_id is not None:
    766                 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
    767 
    768 
    769     def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
    770         """Remove a type of special task in all tasks, keep last one if needed.
    771 
    772         @param special_task_to_remove: type of special task to be removed, e.g.,
    773             models.SpecialTask.Task.VERIFY.
    774         @param keep_last_one: True to keep the last special task if its type is
    775             the same as of special_task_to_remove.
    776 
    777         """
    778         queued_special_tasks = models.SpecialTask.objects.filter(
    779             host__id=self.host.id,
    780             task=special_task_to_remove,
    781             is_active=False, is_complete=False, queue_entry=None)
    782         if keep_last_one:
    783             queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
    784         queued_special_tasks.delete()
    785 
    786 
    787     def _generate_autoserv_label_args(self, task):
    788         """
    789         @param task: An instance of afe model's SpecialTask.
    790         @returns: The list of arguments to pass to autoserv to tell it what the
    791                   labels of a job are.
    792 
    793         """
    794         labels = {x.name for x in task.queue_entry.job.labels}
    795         return ['--job-labels', ','.join(labels)]
    796