Home | History | Annotate | Download | only in python2.7
      1 """A generally useful event scheduler class.
      2 
      3 Each instance of this class manages its own queue.
      4 No multi-threading is implied; you are supposed to hack that
      5 yourself, or use a single instance per application.
      6 
      7 Each instance is parametrized with two functions, one that is
      8 supposed to return the current time, one that is supposed to
      9 implement a delay.  You can implement real-time scheduling by
     10 substituting time and sleep from built-in module time, or you can
     11 implement simulated time by writing your own functions.  This can
     12 also be used to integrate scheduling with STDWIN events; the delay
     13 function is allowed to modify the queue.  Time can be expressed as
     14 integers or floating point numbers, as long as it is consistent.
     15 
     16 Events are specified by tuples (time, priority, action, argument).
     17 As in UNIX, lower priority numbers mean higher priority; in this
     18 way the queue can be maintained as a priority queue.  Execution of the
     19 event means calling the action function, passing it the argument
     20 sequence in "argument" (remember that in Python, multiple function
     21 arguments are be packed in a sequence).
     22 The action function may be an instance method so it
     23 has another way to reference private data (besides global variables).
     24 """
     25 
     26 # XXX The timefunc and delayfunc should have been defined as methods
     27 # XXX so you can define new kinds of schedulers using subclassing
     28 # XXX instead of having to define a module or class just to hold
     29 # XXX the global state of your particular time and delay functions.
     30 
     31 import heapq
     32 from collections import namedtuple
     33 
     34 __all__ = ["scheduler"]
     35 
     36 Event = namedtuple('Event', 'time, priority, action, argument')
     37 
     38 class scheduler:
     39     def __init__(self, timefunc, delayfunc):
     40         """Initialize a new instance, passing the time and delay
     41         functions"""
     42         self._queue = []
     43         self.timefunc = timefunc
     44         self.delayfunc = delayfunc
     45 
     46     def enterabs(self, time, priority, action, argument):
     47         """Enter a new event in the queue at an absolute time.
     48 
     49         Returns an ID for the event which can be used to remove it,
     50         if necessary.
     51 
     52         """
     53         event = Event(time, priority, action, argument)
     54         heapq.heappush(self._queue, event)
     55         return event # The ID
     56 
     57     def enter(self, delay, priority, action, argument):
     58         """A variant that specifies the time as a relative time.
     59 
     60         This is actually the more commonly used interface.
     61 
     62         """
     63         time = self.timefunc() + delay
     64         return self.enterabs(time, priority, action, argument)
     65 
     66     def cancel(self, event):
     67         """Remove an event from the queue.
     68 
     69         This must be presented the ID as returned by enter().
     70         If the event is not in the queue, this raises ValueError.
     71 
     72         """
     73         self._queue.remove(event)
     74         heapq.heapify(self._queue)
     75 
     76     def empty(self):
     77         """Check whether the queue is empty."""
     78         return not self._queue
     79 
     80     def run(self):
     81         """Execute events until the queue is empty.
     82 
     83         When there is a positive delay until the first event, the
     84         delay function is called and the event is left in the queue;
     85         otherwise, the event is removed from the queue and executed
     86         (its action function is called, passing it the argument).  If
     87         the delay function returns prematurely, it is simply
     88         restarted.
     89 
     90         It is legal for both the delay function and the action
     91         function to modify the queue or to raise an exception;
     92         exceptions are not caught but the scheduler's state remains
     93         well-defined so run() may be called again.
     94 
     95         A questionable hack is added to allow other threads to run:
     96         just after an event is executed, a delay of 0 is executed, to
     97         avoid monopolizing the CPU when other threads are also
     98         runnable.
     99 
    100         """
    101         # localize variable access to minimize overhead
    102         # and to improve thread safety
    103         q = self._queue
    104         delayfunc = self.delayfunc
    105         timefunc = self.timefunc
    106         pop = heapq.heappop
    107         while q:
    108             time, priority, action, argument = checked_event = q[0]
    109             now = timefunc()
    110             if now < time:
    111                 delayfunc(time - now)
    112             else:
    113                 event = pop(q)
    114                 # Verify that the event was not removed or altered
    115                 # by another thread after we last looked at q[0].
    116                 if event is checked_event:
    117                     action(*argument)
    118                     delayfunc(0)   # Let other threads run
    119                 else:
    120                     heapq.heappush(q, event)
    121 
    122     @property
    123     def queue(self):
    124         """An ordered list of upcoming events.
    125 
    126         Events are named tuples with fields for:
    127             time, priority, action, arguments
    128 
    129         """
    130         # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
    131         # With heapq, two events scheduled at the same time will show in
    132         # the actual order they would be retrieved.
    133         events = self._queue[:]
    134         return map(heapq.heappop, [events]*len(events))
    135