Home | History | Annotate | Download | only in contrib
      1 #!/usr/bin/python
      2 """
      3 Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
      4     If the job_id is a suite it will find all subjobs.
      5 You need to change the location of the log it will parse.
      6 The job_id needs to be in the afe database.
      7 """
      8 import abc
      9 import datetime
     10 import os
     11 import re
     12 import pprint
     13 import subprocess
     14 import sys
     15 import time
     16 
     17 import common
     18 from autotest_lib.server import frontend
     19 
     20 
     21 LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
     22 # logfile name format: scheduler.log.2014-02-14-18.10.56
     23 time_format = '%Y-%m-%d-%H.%M.%S'
     24 logfile_regex = r'scheduler.log.([0-9,.,-]+)'
     25 logdir = os.path.join('/usr/local/autotest', 'logs')
     26 
     27 class StateMachineViolation(Exception):
     28     pass
     29 
     30 
     31 class LogLineException(Exception):
     32     pass
     33 
     34 
     35 def should_process_log(time_str, time_format, cutoff_days=7):
     36     """Returns true if the logs was created after cutoff days.
     37 
     38     @param time_str: A string representing the time.
     39         eg: 2014-02-14-18.10.56
     40     @param time_format: A string representing the format of the time string.
     41         ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
     42     @param cutoff_days: Int representind the cutoff in days.
     43 
     44     @return: Returns True if time_str has aged more than cutoff_days.
     45     """
     46     log_time = datetime.datetime.strptime(time_str, time_format)
     47     now = datetime.datetime.strptime(time.strftime(time_format), time_format)
     48     cutoff = now - datetime.timedelta(days=cutoff_days)
     49     return log_time < cutoff
     50 
     51 
     52 def apply_regex(regex, line):
     53     """Simple regex applicator.
     54 
     55     @param regex: Regex to apply.
     56     @param line: The line to apply regex on.
     57 
     58     @return: A tuple with the matching groups, if there was a match.
     59     """
     60     log_match  = re.match(regex, line)
     61     if log_match:
     62         return log_match.groups()
     63 
     64 
     65 class StateMachineParser(object):
     66     """Abstract class that enforces state transition ordering.
     67 
     68     Classes inheriting from StateMachineParser need to define an
     69     expected_transitions dictionary. The SMP will pop 'to' states
     70     from the dictionary as they occur, so you cannot same state transitions
     71     unless you specify 2 of them.
     72     """
     73     __metaclass__ = abc.ABCMeta
     74 
     75 
     76     @abc.abstractmethod
     77     def __init__(self):
     78         self.visited_states = []
     79         self.expected_transitions = {}
     80 
     81 
     82     def advance_state(self, from_state, to_state):
     83         """Checks that a transition is valid.
     84 
     85         @param from_state: A string representind the state the host is leaving.
     86         @param to_state: The state The host is going to, represented as a string.
     87 
     88         @raises LogLineException: If an invalid state transition was
     89             detected.
     90         """
     91         # TODO: Updating to the same state is a waste of bw.
     92         if from_state and from_state == to_state:
     93             return ('Updating to the same state is a waste of BW: %s->%s' %
     94                     (from_state, to_state))
     95             return
     96 
     97         if (from_state in self.expected_transitions and
     98             to_state in self.expected_transitions[from_state]):
     99             self.expected_transitions[from_state].remove(to_state)
    100             self.visited_states.append(to_state)
    101         else:
    102             return (from_state, to_state)
    103 
    104 
    105 class SingleJobHostSMP(StateMachineParser):
    106     def __init__(self):
    107         self.visited_states = []
    108         self.expected_transitions = {
    109                 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
    110                 'Resetting': ['Ready', 'Provisioning'],
    111                 'Pending': ['Running'],
    112                 'Provisioning': ['Repairing'],
    113                 'Running': ['Ready']
    114         }
    115 
    116 
    117     def check_transitions(self, hostline):
    118         if hostline.line_info['field'] == 'status':
    119             self.advance_state(hostline.line_info['state'],
    120                     hostline.line_info['value'])
    121 
    122 
    123 class SingleJobHqeSMP(StateMachineParser):
    124     def __init__(self):
    125         self.visited_states = []
    126         self.expected_transitions = {
    127                 'Queued': ['Starting', 'Resetting', 'Aborted'],
    128                 'Resetting': ['Pending', 'Provisioning'],
    129                 'Provisioning': ['Pending', 'Queued', 'Repairing'],
    130                 'Pending': ['Starting'],
    131                 'Starting': ['Running'],
    132                 'Running': ['Gathering', 'Parsing'],
    133                 'Gathering': ['Parsing'],
    134                 'Parsing': ['Completed', 'Aborted']
    135         }
    136 
    137 
    138     def check_transitions(self, hqeline):
    139         invalid_states = self.advance_state(
    140                 hqeline.line_info['from_state'], hqeline.line_info['to_state'])
    141         if not invalid_states:
    142             return
    143 
    144         # Deal with repair.
    145         if (invalid_states[0] == 'Queued' and
    146             'Running' in self.visited_states):
    147             raise StateMachineViolation('Unrecognized state transition '
    148                     '%s->%s, expected transitions are %s' %
    149                     (invalid_states[0], invalid_states[1],
    150                      self.expected_transitions))
    151 
    152 
    153 class LogLine(object):
    154     """Line objects.
    155 
    156     All classes inheriting from LogLine represent a line of some sort.
    157     A line is responsible for parsing itself, and invoking an SMP to
    158     validate state transitions. A line can be part of several state machines.
    159     """
    160     line_format = '%s'
    161 
    162 
    163     def __init__(self, state_machine_parsers):
    164         """
    165         @param state_machine_parsers: A list of smp objects to use to validate
    166             state changes on these types of lines..
    167         """
    168         self.smps = state_machine_parsers
    169 
    170         # Because, this is easier to flush.
    171         self.line_info = {}
    172 
    173 
    174     def parse_line(self, line):
    175         """Apply a line regex and save any information the parsed line contains.
    176 
    177         @param line: A string representing a line.
    178         """
    179         # Regex for all the things.
    180         line_rgx = '(.*)'
    181         parsed_line = apply_regex(line_rgx, line)
    182         if parsed_line:
    183             self.line_info['line'] = parsed_line[0]
    184 
    185 
    186     def flush(self):
    187         """Call any state machine parsers, persist line info if needed.
    188         """
    189         for smp in self.smps:
    190             smp.check_transitions(self)
    191         # TODO: persist this?
    192         self.line_info={}
    193 
    194 
    195     def format_line(self):
    196         try:
    197             return self.line_format % self.line_info
    198         except KeyError:
    199             return self.line_info['line']
    200 
    201 
    202 class TimeLine(LogLine):
    203     """Filters timestamps for scheduler logs.
    204     """
    205 
    206     def parse_line(self, line):
    207         super(TimeLine, self).parse_line(line)
    208 
    209         # Regex for isolating the date and time from scheduler logs, eg:
    210         # 02/16 16:04:36.573 INFO |scheduler_:0574|...
    211         line_rgx = '([0-9,/,:,., ]+)(.*)'
    212         parsed_line = apply_regex(line_rgx, self.line_info['line'])
    213         if parsed_line:
    214             self.line_info['time'] = parsed_line[0]
    215             self.line_info['line'] = parsed_line[1]
    216 
    217 
    218 class HostLine(TimeLine):
    219     """Manages hosts line parsing.
    220     """
    221     line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
    222                 'updated %(field)s->%(value)s')
    223 
    224 
    225     def record_state_transition(self, line):
    226         """Apply the state_transition_rgx to a line and record state changes.
    227 
    228         @param line: The line we're expecting to contain a state transition.
    229         """
    230         state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
    231         match = apply_regex(state_transition_rgx, line)
    232         if match:
    233             self.line_info['state'] = match[0]
    234             self.line_info['field'] = match[1]
    235             self.line_info['value'] = match[2].replace("'", "")
    236 
    237 
    238     def parse_line(self, line):
    239         super(HostLine, self).parse_line(line)
    240 
    241         # Regex for getting host status. Eg:
    242         # 172.22.4 in Running updating {'status': 'Running'}
    243         line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
    244         parsed_line = apply_regex(line_rgx, self.line_info['line'])
    245         if parsed_line:
    246             self.line_info['line'] = parsed_line[0]
    247             self.line_info['host'] = parsed_line[1]
    248             self.record_state_transition(self.line_info['line'])
    249             return self.format_line()
    250 
    251 
    252 class HQELine(TimeLine):
    253     """Manages HQE line parsing.
    254     """
    255     line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
    256             'updated to %(to_state)s. Flags: %(flags)s')
    257 
    258 
    259     def record_state_transition(self, line):
    260         """Apply the state_transition_rgx to a line and record state changes.
    261 
    262         @param line: The line we're expecting to contain a state transition.
    263         """
    264         # Regex for getting hqe status. Eg:
    265         # status:Running [active] -> Gathering
    266         state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
    267         match = apply_regex(state_transition_rgx, line)
    268         if match:
    269             self.line_info['from_state'] = match[0]
    270             self.line_info['flags'] = match[1]
    271             self.line_info['to_state'] = match[2]
    272 
    273 
    274     def parse_line(self, line):
    275         super(HQELine, self).parse_line(line)
    276         line_rgx = r'.*\| HQE: (([0-9]+).*)'
    277         parsed_line = apply_regex(line_rgx, self.line_info['line'])
    278         if parsed_line:
    279             self.line_info['line'] = parsed_line[0]
    280             self.line_info['hqe'] = parsed_line[1]
    281             self.record_state_transition(self.line_info['line'])
    282             return self.format_line()
    283 
    284 
    285 class LogCrawler(object):
    286     """Crawl logs.
    287 
    288     Log crawlers are meant to apply some basic preprocessing to a log, and crawl
    289     the output validating state changes. They manage line and state machine
    290     creation. The initial filtering applied to the log needs to be grab all lines
    291     that match an action, such as the running of a job.
    292     """
    293 
    294     def __init__(self, log_name):
    295         self.log = log_name
    296         self.filter_command = 'cat %s' % log_name
    297 
    298 
    299     def preprocess_log(self):
    300         """Apply some basic filtering to the log.
    301         """
    302         proc = subprocess.Popen(self.filter_command,
    303                 shell=True, stdout=subprocess.PIPE)
    304         out, err = proc.communicate()
    305         return out
    306 
    307 
    308 class SchedulerLogCrawler(LogCrawler):
    309     """A log crawler for the scheduler logs.
    310 
    311     This crawler is only capable of processing information about a single job.
    312     """
    313 
    314     def __init__(self, log_name, **kwargs):
    315         super(SchedulerLogCrawler, self).__init__(log_name)
    316         self.job_id = kwargs['job_id']
    317         self.line_processors = [HostLine([SingleJobHostSMP()]),
    318                 HQELine([SingleJobHqeSMP()])]
    319         self.filter_command = ('%s | grep "for job: %s"' %
    320                 (self.filter_command, self.job_id))
    321 
    322 
    323     def parse_log(self):
    324         """Parse each line of the preprocessed log output.
    325 
    326         Pass each line through each possible line_processor. The one that matches
    327         will populate itself, call flush, this will walk the state machine of that
    328         line to the next step.
    329         """
    330         out = self.preprocess_log()
    331         response = []
    332         for job_line in out.split('\n'):
    333             parsed_line = None
    334             for processor in self.line_processors:
    335                 line = processor.parse_line(job_line)
    336                 if line and parsed_line:
    337                     raise LogLineException('Multiple Parsers claiming the line %s: '
    338                             'previous parsing: %s, current parsing: %s ' %
    339                             (job_line, parsed_line, line))
    340                 elif line:
    341                     parsed_line = line
    342                     try:
    343                         processor.flush()
    344                     except StateMachineViolation as e:
    345                         response.append(str(e))
    346                         raise StateMachineViolation(response)
    347             response.append(parsed_line if parsed_line else job_line)
    348         return response
    349 
    350 
    351 def process_logs():
    352     if len(sys.argv) < 2:
    353         print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
    354                'You need to change the location of the log it will parse.'
    355                 'The job_id needs to be in the afe database.')
    356         sys.exit(1)
    357 
    358     job_id = int(sys.argv[1])
    359     rpc = frontend.AFE()
    360     suite_jobs = rpc.run('get_jobs', id=job_id)
    361     if not suite_jobs[0]['parent_job']:
    362         suite_jobs = rpc.run('get_jobs', parent_job=job_id)
    363     try:
    364         logfile = sys.argv[2]
    365     except Exception:
    366         logfile = LOGFILE
    367 
    368     for job in suite_jobs:
    369         log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
    370         for line in log_crawler.parse_log():
    371             print line
    372     return
    373 
    374 
    375 if __name__ == '__main__':
    376     process_logs()
    377