Home | History | Annotate | Download | only in Lib
      1 """A generally useful event scheduler class.
      2 
      3 Each instance of this class manages its own queue.
      4 No multi-threading is implied; you are supposed to hack that
      5 yourself, or use a single instance per application.
      6 
      7 Each instance is parametrized with two functions, one that is
      8 supposed to return the current time, one that is supposed to
      9 implement a delay.  You can implement real-time scheduling by
     10 substituting time and sleep from built-in module time, or you can
     11 implement simulated time by writing your own functions.  This can
     12 also be used to integrate scheduling with STDWIN events; the delay
     13 function is allowed to modify the queue.  Time can be expressed as
     14 integers or floating point numbers, as long as it is consistent.
     15 
     16 Events are specified by tuples (time, priority, action, argument, kwargs).
     17 As in UNIX, lower priority numbers mean higher priority; in this
     18 way the queue can be maintained as a priority queue.  Execution of the
     19 event means calling the action function, passing it the argument
     20 sequence in "argument" (remember that in Python, multiple function
     21 arguments are be packed in a sequence) and keyword parameters in "kwargs".
     22 The action function may be an instance method so it
     23 has another way to reference private data (besides global variables).
     24 """
     25 
     26 import time
     27 import heapq
     28 from collections import namedtuple
     29 try:
     30     import threading
     31 except ImportError:
     32     import dummy_threading as threading
     33 from time import monotonic as _time
     34 
     35 __all__ = ["scheduler"]
     36 
     37 class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
     38     __slots__ = []
     39     def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
     40     def __lt__(s, o): return (s.time, s.priority) <  (o.time, o.priority)
     41     def __le__(s, o): return (s.time, s.priority) <= (o.time, o.priority)
     42     def __gt__(s, o): return (s.time, s.priority) >  (o.time, o.priority)
     43     def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority)
     44 
     45 Event.time.__doc__ = ('''Numeric type compatible with the return value of the
     46 timefunc function passed to the constructor.''')
     47 Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
     48 in the order of their priority.''')
     49 Event.action.__doc__ = ('''Executing the event means executing
     50 action(*argument, **kwargs)''')
     51 Event.argument.__doc__ = ('''argument is a sequence holding the positional
     52 arguments for the action.''')
     53 Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
     54 arguments for the action.''')
     55 
     56 _sentinel = object()
     57 
     58 class scheduler:
     59 
     60     def __init__(self, timefunc=_time, delayfunc=time.sleep):
     61         """Initialize a new instance, passing the time and delay
     62         functions"""
     63         self._queue = []
     64         self._lock = threading.RLock()
     65         self.timefunc = timefunc
     66         self.delayfunc = delayfunc
     67 
     68     def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
     69         """Enter a new event in the queue at an absolute time.
     70 
     71         Returns an ID for the event which can be used to remove it,
     72         if necessary.
     73 
     74         """
     75         if kwargs is _sentinel:
     76             kwargs = {}
     77         event = Event(time, priority, action, argument, kwargs)
     78         with self._lock:
     79             heapq.heappush(self._queue, event)
     80         return event # The ID
     81 
     82     def enter(self, delay, priority, action, argument=(), kwargs=_sentinel):
     83         """A variant that specifies the time as a relative time.
     84 
     85         This is actually the more commonly used interface.
     86 
     87         """
     88         time = self.timefunc() + delay
     89         return self.enterabs(time, priority, action, argument, kwargs)
     90 
     91     def cancel(self, event):
     92         """Remove an event from the queue.
     93 
     94         This must be presented the ID as returned by enter().
     95         If the event is not in the queue, this raises ValueError.
     96 
     97         """
     98         with self._lock:
     99             self._queue.remove(event)
    100             heapq.heapify(self._queue)
    101 
    102     def empty(self):
    103         """Check whether the queue is empty."""
    104         with self._lock:
    105             return not self._queue
    106 
    107     def run(self, blocking=True):
    108         """Execute events until the queue is empty.
    109         If blocking is False executes the scheduled events due to
    110         expire soonest (if any) and then return the deadline of the
    111         next scheduled call in the scheduler.
    112 
    113         When there is a positive delay until the first event, the
    114         delay function is called and the event is left in the queue;
    115         otherwise, the event is removed from the queue and executed
    116         (its action function is called, passing it the argument).  If
    117         the delay function returns prematurely, it is simply
    118         restarted.
    119 
    120         It is legal for both the delay function and the action
    121         function to modify the queue or to raise an exception;
    122         exceptions are not caught but the scheduler's state remains
    123         well-defined so run() may be called again.
    124 
    125         A questionable hack is added to allow other threads to run:
    126         just after an event is executed, a delay of 0 is executed, to
    127         avoid monopolizing the CPU when other threads are also
    128         runnable.
    129 
    130         """
    131         # localize variable access to minimize overhead
    132         # and to improve thread safety
    133         lock = self._lock
    134         q = self._queue
    135         delayfunc = self.delayfunc
    136         timefunc = self.timefunc
    137         pop = heapq.heappop
    138         while True:
    139             with lock:
    140                 if not q:
    141                     break
    142                 time, priority, action, argument, kwargs = q[0]
    143                 now = timefunc()
    144                 if time > now:
    145                     delay = True
    146                 else:
    147                     delay = False
    148                     pop(q)
    149             if delay:
    150                 if not blocking:
    151                     return time - now
    152                 delayfunc(time - now)
    153             else:
    154                 action(*argument, **kwargs)
    155                 delayfunc(0)   # Let other threads run
    156 
    157     @property
    158     def queue(self):
    159         """An ordered list of upcoming events.
    160 
    161         Events are named tuples with fields for:
    162             time, priority, action, arguments, kwargs
    163 
    164         """
    165         # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
    166         # With heapq, two events scheduled at the same time will show in
    167         # the actual order they would be retrieved.
    168         with self._lock:
    169             events = self._queue[:]
    170         return list(map(heapq.heappop, [events]*len(events)))
    171