Home | History | Annotate | Download | only in common
      1 # -*- coding: utf-8 -*-
      2 #
      3 # Copyright 2011 Google Inc. All Rights Reserved.
      4 #
      5 """Tools for recording and reporting timeline of abstract events.
      6 
      7 You can store any events provided that they can be stringified.
      8 """
      9 
     10 __author__ = 'kbaclawski (at] google.com (Krystian Baclawski)'
     11 
     12 import collections
     13 import datetime
     14 import time
     15 
     16 
     17 class _EventRecord(object):
     18   """Internal class.  Attaches extra information to an event."""
     19 
     20   def __init__(self, event, time_started=None, time_elapsed=None):
     21     self._event = event
     22     self._time_started = time_started or time.time()
     23     self._time_elapsed = None
     24 
     25     if time_elapsed:
     26       self.time_elapsed = time_elapsed
     27 
     28   @property
     29   def event(self):
     30     return self._event
     31 
     32   @property
     33   def time_started(self):
     34     return self._time_started
     35 
     36   def _TimeElapsedGet(self):
     37     if self.has_finished:
     38       time_elapsed = self._time_elapsed
     39     else:
     40       time_elapsed = time.time() - self._time_started
     41 
     42     return datetime.timedelta(seconds=time_elapsed)
     43 
     44   def _TimeElapsedSet(self, time_elapsed):
     45     if isinstance(time_elapsed, datetime.timedelta):
     46       self._time_elapsed = time_elapsed.seconds
     47     else:
     48       self._time_elapsed = time_elapsed
     49 
     50   time_elapsed = property(_TimeElapsedGet, _TimeElapsedSet)
     51 
     52   @property
     53   def has_finished(self):
     54     return self._time_elapsed is not None
     55 
     56   def GetTimeStartedFormatted(self):
     57     return time.strftime('%m/%d/%Y %H:%M:%S', time.gmtime(self._time_started))
     58 
     59   def GetTimeElapsedRounded(self):
     60     return datetime.timedelta(seconds=int(self.time_elapsed.seconds))
     61 
     62   def Finish(self):
     63     if not self.has_finished:
     64       self._time_elapsed = time.time() - self._time_started
     65 
     66 
     67 class _Transition(collections.namedtuple('_Transition', ('from_', 'to_'))):
     68   """Internal class.  Represents transition point between events / states."""
     69 
     70   def __str__(self):
     71     return '%s => %s' % (self.from_, self.to_)
     72 
     73 
     74 class EventHistory(collections.Sequence):
     75   """Records events and provides human readable events timeline."""
     76 
     77   def __init__(self, records=None):
     78     self._records = records or []
     79 
     80   def __len__(self):
     81     return len(self._records)
     82 
     83   def __iter__(self):
     84     return iter(self._records)
     85 
     86   def __getitem__(self, index):
     87     return self._records[index]
     88 
     89   @property
     90   def last(self):
     91     if self._records:
     92       return self._records[-1]
     93 
     94   def AddEvent(self, event):
     95     if self.last:
     96       self.last.Finish()
     97 
     98     evrec = _EventRecord(event)
     99     self._records.append(evrec)
    100     return evrec
    101 
    102   def GetTotalTime(self):
    103     if self._records:
    104       total_time_elapsed = sum(evrec.time_elapsed.seconds
    105                                for evrec in self._records)
    106 
    107       return datetime.timedelta(seconds=int(total_time_elapsed))
    108 
    109   def GetTransitionEventHistory(self):
    110     records = []
    111 
    112     if self._records:
    113       for num, next_evrec in enumerate(self._records[1:], start=1):
    114         evrec = self._records[num - 1]
    115 
    116         records.append(_EventRecord(
    117             _Transition(evrec.event, next_evrec.event), evrec.time_started,
    118             evrec.time_elapsed))
    119 
    120       if not self.last.has_finished:
    121         records.append(_EventRecord(
    122             _Transition(self.last.event,
    123                         'NOW'), self.last.time_started, self.last.time_elapsed))
    124 
    125     return EventHistory(records)
    126 
    127   @staticmethod
    128   def _GetReport(history, report_name):
    129     report = [report_name]
    130 
    131     for num, evrec in enumerate(history, start=1):
    132       time_elapsed = str(evrec.GetTimeElapsedRounded())
    133 
    134       if not evrec.has_finished:
    135         time_elapsed.append(' (not finished)')
    136 
    137       report.append('%d) %s: %s: %s' % (num, evrec.GetTimeStartedFormatted(),
    138                                         evrec.event, time_elapsed))
    139 
    140     report.append('Total Time: %s' % history.GetTotalTime())
    141 
    142     return '\n'.join(report)
    143 
    144   def GetEventReport(self):
    145     return EventHistory._GetReport(self, 'Timeline of events:')
    146 
    147   def GetTransitionEventReport(self):
    148     return EventHistory._GetReport(self.GetTransitionEventHistory(),
    149                                    'Timeline of transition events:')
    150