1 #!/usr/bin/python 2 """ 3 Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile 4 If the job_id is a suite it will find all subjobs. 5 You need to change the location of the log it will parse. 6 The job_id needs to be in the afe database. 7 """ 8 import abc 9 import datetime 10 import os 11 import re 12 import pprint 13 import subprocess 14 import sys 15 import time 16 17 import common 18 from autotest_lib.server import frontend 19 20 21 LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47' 22 # logfile name format: scheduler.log.2014-02-14-18.10.56 23 time_format = '%Y-%m-%d-%H.%M.%S' 24 logfile_regex = r'scheduler.log.([0-9,.,-]+)' 25 logdir = os.path.join('/usr/local/autotest', 'logs') 26 27 class StateMachineViolation(Exception): 28 pass 29 30 31 class LogLineException(Exception): 32 pass 33 34 35 def should_process_log(time_str, time_format, cutoff_days=7): 36 """Returns true if the logs was created after cutoff days. 37 38 @param time_str: A string representing the time. 39 eg: 2014-02-14-18.10.56 40 @param time_format: A string representing the format of the time string. 41 ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior 42 @param cutoff_days: Int representind the cutoff in days. 43 44 @return: Returns True if time_str has aged more than cutoff_days. 45 """ 46 log_time = datetime.datetime.strptime(time_str, time_format) 47 now = datetime.datetime.strptime(time.strftime(time_format), time_format) 48 cutoff = now - datetime.timedelta(days=cutoff_days) 49 return log_time < cutoff 50 51 52 def apply_regex(regex, line): 53 """Simple regex applicator. 54 55 @param regex: Regex to apply. 56 @param line: The line to apply regex on. 57 58 @return: A tuple with the matching groups, if there was a match. 59 """ 60 log_match = re.match(regex, line) 61 if log_match: 62 return log_match.groups() 63 64 65 class StateMachineParser(object): 66 """Abstract class that enforces state transition ordering. 67 68 Classes inheriting from StateMachineParser need to define an 69 expected_transitions dictionary. The SMP will pop 'to' states 70 from the dictionary as they occur, so you cannot same state transitions 71 unless you specify 2 of them. 72 """ 73 __metaclass__ = abc.ABCMeta 74 75 76 @abc.abstractmethod 77 def __init__(self): 78 self.visited_states = [] 79 self.expected_transitions = {} 80 81 82 def advance_state(self, from_state, to_state): 83 """Checks that a transition is valid. 84 85 @param from_state: A string representind the state the host is leaving. 86 @param to_state: The state The host is going to, represented as a string. 87 88 @raises LogLineException: If an invalid state transition was 89 detected. 90 """ 91 # TODO: Updating to the same state is a waste of bw. 92 if from_state and from_state == to_state: 93 return ('Updating to the same state is a waste of BW: %s->%s' % 94 (from_state, to_state)) 95 return 96 97 if (from_state in self.expected_transitions and 98 to_state in self.expected_transitions[from_state]): 99 self.expected_transitions[from_state].remove(to_state) 100 self.visited_states.append(to_state) 101 else: 102 return (from_state, to_state) 103 104 105 class SingleJobHostSMP(StateMachineParser): 106 def __init__(self): 107 self.visited_states = [] 108 self.expected_transitions = { 109 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'], 110 'Resetting': ['Ready', 'Provisioning'], 111 'Pending': ['Running'], 112 'Provisioning': ['Repairing'], 113 'Running': ['Ready'] 114 } 115 116 117 def check_transitions(self, hostline): 118 if hostline.line_info['field'] == 'status': 119 self.advance_state(hostline.line_info['state'], 120 hostline.line_info['value']) 121 122 123 class SingleJobHqeSMP(StateMachineParser): 124 def __init__(self): 125 self.visited_states = [] 126 self.expected_transitions = { 127 'Queued': ['Starting', 'Resetting', 'Aborted'], 128 'Resetting': ['Pending', 'Provisioning'], 129 'Provisioning': ['Pending', 'Queued', 'Repairing'], 130 'Pending': ['Starting'], 131 'Starting': ['Running'], 132 'Running': ['Gathering', 'Parsing'], 133 'Gathering': ['Parsing'], 134 'Parsing': ['Completed', 'Aborted'] 135 } 136 137 138 def check_transitions(self, hqeline): 139 invalid_states = self.advance_state( 140 hqeline.line_info['from_state'], hqeline.line_info['to_state']) 141 if not invalid_states: 142 return 143 144 # Deal with repair. 145 if (invalid_states[0] == 'Queued' and 146 'Running' in self.visited_states): 147 raise StateMachineViolation('Unrecognized state transition ' 148 '%s->%s, expected transitions are %s' % 149 (invalid_states[0], invalid_states[1], 150 self.expected_transitions)) 151 152 153 class LogLine(object): 154 """Line objects. 155 156 All classes inheriting from LogLine represent a line of some sort. 157 A line is responsible for parsing itself, and invoking an SMP to 158 validate state transitions. A line can be part of several state machines. 159 """ 160 line_format = '%s' 161 162 163 def __init__(self, state_machine_parsers): 164 """ 165 @param state_machine_parsers: A list of smp objects to use to validate 166 state changes on these types of lines.. 167 """ 168 self.smps = state_machine_parsers 169 170 # Because, this is easier to flush. 171 self.line_info = {} 172 173 174 def parse_line(self, line): 175 """Apply a line regex and save any information the parsed line contains. 176 177 @param line: A string representing a line. 178 """ 179 # Regex for all the things. 180 line_rgx = '(.*)' 181 parsed_line = apply_regex(line_rgx, line) 182 if parsed_line: 183 self.line_info['line'] = parsed_line[0] 184 185 186 def flush(self): 187 """Call any state machine parsers, persist line info if needed. 188 """ 189 for smp in self.smps: 190 smp.check_transitions(self) 191 # TODO: persist this? 192 self.line_info={} 193 194 195 def format_line(self): 196 try: 197 return self.line_format % self.line_info 198 except KeyError: 199 return self.line_info['line'] 200 201 202 class TimeLine(LogLine): 203 """Filters timestamps for scheduler logs. 204 """ 205 206 def parse_line(self, line): 207 super(TimeLine, self).parse_line(line) 208 209 # Regex for isolating the date and time from scheduler logs, eg: 210 # 02/16 16:04:36.573 INFO |scheduler_:0574|... 211 line_rgx = '([0-9,/,:,., ]+)(.*)' 212 parsed_line = apply_regex(line_rgx, self.line_info['line']) 213 if parsed_line: 214 self.line_info['time'] = parsed_line[0] 215 self.line_info['line'] = parsed_line[1] 216 217 218 class HostLine(TimeLine): 219 """Manages hosts line parsing. 220 """ 221 line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, ' 222 'updated %(field)s->%(value)s') 223 224 225 def record_state_transition(self, line): 226 """Apply the state_transition_rgx to a line and record state changes. 227 228 @param line: The line we're expecting to contain a state transition. 229 """ 230 state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*" 231 match = apply_regex(state_transition_rgx, line) 232 if match: 233 self.line_info['state'] = match[0] 234 self.line_info['field'] = match[1] 235 self.line_info['value'] = match[2].replace("'", "") 236 237 238 def parse_line(self, line): 239 super(HostLine, self).parse_line(line) 240 241 # Regex for getting host status. Eg: 242 # 172.22.4 in Running updating {'status': 'Running'} 243 line_rgx = '.*Host (([0-9,.,a-z,-]+).*)' 244 parsed_line = apply_regex(line_rgx, self.line_info['line']) 245 if parsed_line: 246 self.line_info['line'] = parsed_line[0] 247 self.line_info['host'] = parsed_line[1] 248 self.record_state_transition(self.line_info['line']) 249 return self.format_line() 250 251 252 class HQELine(TimeLine): 253 """Manages HQE line parsing. 254 """ 255 line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, ' 256 'updated to %(to_state)s. Flags: %(flags)s') 257 258 259 def record_state_transition(self, line): 260 """Apply the state_transition_rgx to a line and record state changes. 261 262 @param line: The line we're expecting to contain a state transition. 263 """ 264 # Regex for getting hqe status. Eg: 265 # status:Running [active] -> Gathering 266 state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)" 267 match = apply_regex(state_transition_rgx, line) 268 if match: 269 self.line_info['from_state'] = match[0] 270 self.line_info['flags'] = match[1] 271 self.line_info['to_state'] = match[2] 272 273 274 def parse_line(self, line): 275 super(HQELine, self).parse_line(line) 276 line_rgx = r'.*\| HQE: (([0-9]+).*)' 277 parsed_line = apply_regex(line_rgx, self.line_info['line']) 278 if parsed_line: 279 self.line_info['line'] = parsed_line[0] 280 self.line_info['hqe'] = parsed_line[1] 281 self.record_state_transition(self.line_info['line']) 282 return self.format_line() 283 284 285 class LogCrawler(object): 286 """Crawl logs. 287 288 Log crawlers are meant to apply some basic preprocessing to a log, and crawl 289 the output validating state changes. They manage line and state machine 290 creation. The initial filtering applied to the log needs to be grab all lines 291 that match an action, such as the running of a job. 292 """ 293 294 def __init__(self, log_name): 295 self.log = log_name 296 self.filter_command = 'cat %s' % log_name 297 298 299 def preprocess_log(self): 300 """Apply some basic filtering to the log. 301 """ 302 proc = subprocess.Popen(self.filter_command, 303 shell=True, stdout=subprocess.PIPE) 304 out, err = proc.communicate() 305 return out 306 307 308 class SchedulerLogCrawler(LogCrawler): 309 """A log crawler for the scheduler logs. 310 311 This crawler is only capable of processing information about a single job. 312 """ 313 314 def __init__(self, log_name, **kwargs): 315 super(SchedulerLogCrawler, self).__init__(log_name) 316 self.job_id = kwargs['job_id'] 317 self.line_processors = [HostLine([SingleJobHostSMP()]), 318 HQELine([SingleJobHqeSMP()])] 319 self.filter_command = ('%s | grep "for job: %s"' % 320 (self.filter_command, self.job_id)) 321 322 323 def parse_log(self): 324 """Parse each line of the preprocessed log output. 325 326 Pass each line through each possible line_processor. The one that matches 327 will populate itself, call flush, this will walk the state machine of that 328 line to the next step. 329 """ 330 out = self.preprocess_log() 331 response = [] 332 for job_line in out.split('\n'): 333 parsed_line = None 334 for processor in self.line_processors: 335 line = processor.parse_line(job_line) 336 if line and parsed_line: 337 raise LogLineException('Multiple Parsers claiming the line %s: ' 338 'previous parsing: %s, current parsing: %s ' % 339 (job_line, parsed_line, line)) 340 elif line: 341 parsed_line = line 342 try: 343 processor.flush() 344 except StateMachineViolation as e: 345 response.append(str(e)) 346 raise StateMachineViolation(response) 347 response.append(parsed_line if parsed_line else job_line) 348 return response 349 350 351 def process_logs(): 352 if len(sys.argv) < 2: 353 print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 ' 354 'You need to change the location of the log it will parse.' 355 'The job_id needs to be in the afe database.') 356 sys.exit(1) 357 358 job_id = int(sys.argv[1]) 359 rpc = frontend.AFE() 360 suite_jobs = rpc.run('get_jobs', id=job_id) 361 if not suite_jobs[0]['parent_job']: 362 suite_jobs = rpc.run('get_jobs', parent_job=job_id) 363 try: 364 logfile = sys.argv[2] 365 except Exception: 366 logfile = LOGFILE 367 368 for job in suite_jobs: 369 log_crawler = SchedulerLogCrawler(logfile, job_id=job['id']) 370 for line in log_crawler.parse_log(): 371 print line 372 return 373 374 375 if __name__ == '__main__': 376 process_logs() 377