Home | History | Annotate | Download | only in controllers
      1 #
      2 #   Copyright 2016- The Android Open Source Project
      3 #
      4 #   Licensed under the Apache License, Version 2.0 (the "License");
      5 #   you may not use this file except in compliance with the License.
      6 #   You may obtain a copy of the License at
      7 #
      8 #       http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 #   Unless required by applicable law or agreed to in writing, software
     11 #   distributed under the License is distributed on an "AS IS" BASIS,
     12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 #   See the License for the specific language governing permissions and
     14 #   limitations under the License.
     15 
     16 from concurrent.futures import ThreadPoolExecutor
     17 import queue
     18 import re
     19 import socket
     20 import threading
     21 import time
     22 import traceback
     23 
     24 
     25 class EventDispatcherError(Exception):
     26     pass
     27 
     28 
     29 class IllegalStateError(EventDispatcherError):
     30     """Raise when user tries to put event_dispatcher into an illegal state.
     31     """
     32 
     33 
     34 class DuplicateError(EventDispatcherError):
     35     """Raise when a duplicate is being created and it shouldn't.
     36     """
     37 
     38 
     39 class EventDispatcher:
     40     """Class managing events for an sl4a connection.
     41     """
     42 
     43     DEFAULT_TIMEOUT = 60
     44 
     45     def __init__(self, droid):
     46         self.droid = droid
     47         self.started = False
     48         self.executor = None
     49         self.poller = None
     50         self.event_dict = {}
     51         self.handlers = {}
     52         self.lock = threading.RLock()
     53 
     54     def poll_events(self):
     55         """Continuously polls all types of events from sl4a.
     56 
     57         Events are sorted by name and store in separate queues.
     58         If there are registered handlers, the handlers will be called with
     59         corresponding event immediately upon event discovery, and the event
     60         won't be stored. If exceptions occur, stop the dispatcher and return
     61         """
     62         while self.started:
     63             event_obj = None
     64             event_name = None
     65             try:
     66                 event_obj = self.droid.eventWait(50000)
     67             except:
     68                 if self.started:
     69                     print("Exception happened during polling.")
     70                     print(traceback.format_exc())
     71                     raise
     72             if not event_obj:
     73                 continue
     74             elif 'name' not in event_obj:
     75                 print("Received Malformed event {}".format(event_obj))
     76                 continue
     77             else:
     78                 event_name = event_obj['name']
     79             # if handler registered, process event
     80             if event_name in self.handlers:
     81                 self.handle_subscribed_event(event_obj, event_name)
     82             if event_name == "EventDispatcherShutdown":
     83                 self.droid.closeSl4aSession()
     84                 break
     85             else:
     86                 self.lock.acquire()
     87                 if event_name in self.event_dict:  # otherwise, cache event
     88                     self.event_dict[event_name].put(event_obj)
     89                 else:
     90                     q = queue.Queue()
     91                     q.put(event_obj)
     92                     self.event_dict[event_name] = q
     93                 self.lock.release()
     94 
     95     def register_handler(self, handler, event_name, args):
     96         """Registers an event handler.
     97 
     98         One type of event can only have one event handler associated with it.
     99 
    100         Args:
    101             handler: The event handler function to be registered.
    102             event_name: Name of the event the handler is for.
    103             args: User arguments to be passed to the handler when it's called.
    104 
    105         Raises:
    106             IllegalStateError: Raised if attempts to register a handler after
    107                 the dispatcher starts running.
    108             DuplicateError: Raised if attempts to register more than one
    109                 handler for one type of event.
    110         """
    111         if self.started:
    112             raise IllegalStateError(("Can't register service after polling is"
    113                                      " started"))
    114         self.lock.acquire()
    115         try:
    116             if event_name in self.handlers:
    117                 raise DuplicateError('A handler for {} already exists'.format(
    118                     event_name))
    119             self.handlers[event_name] = (handler, args)
    120         finally:
    121             self.lock.release()
    122 
    123     def start(self):
    124         """Starts the event dispatcher.
    125 
    126         Initiates executor and start polling events.
    127 
    128         Raises:
    129             IllegalStateError: Can't start a dispatcher again when it's already
    130                 running.
    131         """
    132         if not self.started:
    133             self.started = True
    134             self.executor = ThreadPoolExecutor(max_workers=32)
    135             self.poller = self.executor.submit(self.poll_events)
    136         else:
    137             raise IllegalStateError("Dispatcher is already started.")
    138 
    139     def clean_up(self):
    140         """Clean up and release resources after the event dispatcher polling
    141         loop has been broken.
    142 
    143         The following things happen:
    144         1. Clear all events and flags.
    145         2. Close the sl4a client the event_dispatcher object holds.
    146         3. Shut down executor without waiting.
    147         """
    148         uid = self.droid.uid
    149         if not self.started:
    150             return
    151         self.started = False
    152         self.clear_all_events()
    153         self.droid.close()
    154         self.poller.set_result("Done")
    155         # The polling thread is guaranteed to finish after a max of 60 seconds,
    156         # so we don't wait here.
    157         self.executor.shutdown(wait=False)
    158 
    159     def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
    160         """Pop an event from its queue.
    161 
    162         Return and remove the oldest entry of an event.
    163         Block until an event of specified name is available or
    164         times out if timeout is set.
    165 
    166         Args:
    167             event_name: Name of the event to be popped.
    168             timeout: Number of seconds to wait when event is not present.
    169                 Never times out if None.
    170 
    171         Returns:
    172             event: The oldest entry of the specified event. None if timed out.
    173 
    174         Raises:
    175             IllegalStateError: Raised if pop is called before the dispatcher
    176                 starts polling.
    177         """
    178         if not self.started:
    179             raise IllegalStateError(
    180                 "Dispatcher needs to be started before popping.")
    181 
    182         e_queue = self.get_event_q(event_name)
    183 
    184         if not e_queue:
    185             raise TypeError("Failed to get an event queue for {}".format(
    186                 event_name))
    187 
    188         try:
    189             # Block for timeout
    190             if timeout:
    191                 return e_queue.get(True, timeout)
    192             # Non-blocking poll for event
    193             elif timeout == 0:
    194                 return e_queue.get(False)
    195             else:
    196                 # Block forever on event wait
    197                 return e_queue.get(True)
    198         except queue.Empty:
    199             raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
    200                 timeout, event_name))
    201 
    202     def wait_for_event(self,
    203                        event_name,
    204                        predicate,
    205                        timeout=DEFAULT_TIMEOUT,
    206                        *args,
    207                        **kwargs):
    208         """Wait for an event that satisfies a predicate to appear.
    209 
    210         Continuously pop events of a particular name and check against the
    211         predicate until an event that satisfies the predicate is popped or
    212         timed out. Note this will remove all the events of the same name that
    213         do not satisfy the predicate in the process.
    214 
    215         Args:
    216             event_name: Name of the event to be popped.
    217             predicate: A function that takes an event and returns True if the
    218                 predicate is satisfied, False otherwise.
    219             timeout: Number of seconds to wait.
    220             *args: Optional positional args passed to predicate().
    221             **kwargs: Optional keyword args passed to predicate().
    222 
    223         Returns:
    224             The event that satisfies the predicate.
    225 
    226         Raises:
    227             queue.Empty: Raised if no event that satisfies the predicate was
    228                 found before time out.
    229         """
    230         deadline = time.time() + timeout
    231 
    232         while True:
    233             event = None
    234             try:
    235                 event = self.pop_event(event_name, 1)
    236             except queue.Empty:
    237                 pass
    238 
    239             if event and predicate(event, *args, **kwargs):
    240                 return event
    241 
    242             if time.time() > deadline:
    243                 raise queue.Empty(
    244                     'Timeout after {}s waiting for event: {}'.format(
    245                         timeout, event_name))
    246 
    247     def pop_events(self, regex_pattern, timeout):
    248         """Pop events whose names match a regex pattern.
    249 
    250         If such event(s) exist, pop one event from each event queue that
    251         satisfies the condition. Otherwise, wait for an event that satisfies
    252         the condition to occur, with timeout.
    253 
    254         Results are sorted by timestamp in ascending order.
    255 
    256         Args:
    257             regex_pattern: The regular expression pattern that an event name
    258                 should match in order to be popped.
    259             timeout: Number of seconds to wait for events in case no event
    260                 matching the condition exits when the function is called.
    261 
    262         Returns:
    263             results: Pop events whose names match a regex pattern.
    264                 Empty if none exist and the wait timed out.
    265 
    266         Raises:
    267             IllegalStateError: Raised if pop is called before the dispatcher
    268                 starts polling.
    269             queue.Empty: Raised if no event was found before time out.
    270         """
    271         if not self.started:
    272             raise IllegalStateError(
    273                 "Dispatcher needs to be started before popping.")
    274         deadline = time.time() + timeout
    275         while True:
    276             #TODO: fix the sleep loop
    277             results = self._match_and_pop(regex_pattern)
    278             if len(results) != 0 or time.time() > deadline:
    279                 break
    280             time.sleep(1)
    281         if len(results) == 0:
    282             raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
    283                 timeout, regex_pattern))
    284 
    285         return sorted(results, key=lambda event: event['time'])
    286 
    287     def _match_and_pop(self, regex_pattern):
    288         """Pop one event from each of the event queues whose names
    289         match (in a sense of regular expression) regex_pattern.
    290         """
    291         results = []
    292         self.lock.acquire()
    293         for name in self.event_dict.keys():
    294             if re.match(regex_pattern, name):
    295                 q = self.event_dict[name]
    296                 if q:
    297                     try:
    298                         results.append(q.get(False))
    299                     except:
    300                         pass
    301         self.lock.release()
    302         return results
    303 
    304     def get_event_q(self, event_name):
    305         """Obtain the queue storing events of the specified name.
    306 
    307         If no event of this name has been polled, wait for one to.
    308 
    309         Returns:
    310             queue: A queue storing all the events of the specified name.
    311                 None if timed out.
    312             timeout: Number of seconds to wait for the operation.
    313 
    314         Raises:
    315             queue.Empty: Raised if the queue does not exist and timeout has
    316                 passed.
    317         """
    318         self.lock.acquire()
    319         if not event_name in self.event_dict or self.event_dict[
    320                 event_name] is None:
    321             self.event_dict[event_name] = queue.Queue()
    322         self.lock.release()
    323 
    324         event_queue = self.event_dict[event_name]
    325         return event_queue
    326 
    327     def handle_subscribed_event(self, event_obj, event_name):
    328         """Execute the registered handler of an event.
    329 
    330         Retrieve the handler and its arguments, and execute the handler in a
    331             new thread.
    332 
    333         Args:
    334             event_obj: Json object of the event.
    335             event_name: Name of the event to call handler for.
    336         """
    337         handler, args = self.handlers[event_name]
    338         self.executor.submit(handler, event_obj, *args)
    339 
    340     def _handle(self, event_handler, event_name, user_args, event_timeout,
    341                 cond, cond_timeout):
    342         """Pop an event of specified type and calls its handler on it. If
    343         condition is not None, block until condition is met or timeout.
    344         """
    345         if cond:
    346             cond.wait(cond_timeout)
    347         event = self.pop_event(event_name, event_timeout)
    348         return event_handler(event, *user_args)
    349 
    350     def handle_event(self,
    351                      event_handler,
    352                      event_name,
    353                      user_args,
    354                      event_timeout=None,
    355                      cond=None,
    356                      cond_timeout=None):
    357         """Handle events that don't have registered handlers
    358 
    359         In a new thread, poll one event of specified type from its queue and
    360         execute its handler. If no such event exists, the thread waits until
    361         one appears.
    362 
    363         Args:
    364             event_handler: Handler for the event, which should take at least
    365                 one argument - the event json object.
    366             event_name: Name of the event to be handled.
    367             user_args: User arguments for the handler; to be passed in after
    368                 the event json.
    369             event_timeout: Number of seconds to wait for the event to come.
    370             cond: A condition to wait on before executing the handler. Should
    371                 be a threading.Event object.
    372             cond_timeout: Number of seconds to wait before the condition times
    373                 out. Never times out if None.
    374 
    375         Returns:
    376             worker: A concurrent.Future object associated with the handler.
    377                 If blocking call worker.result() is triggered, the handler
    378                 needs to return something to unblock.
    379         """
    380         worker = self.executor.submit(self._handle, event_handler, event_name,
    381                                       user_args, event_timeout, cond,
    382                                       cond_timeout)
    383         return worker
    384 
    385     def pop_all(self, event_name):
    386         """Return and remove all stored events of a specified name.
    387 
    388         Pops all events from their queue. May miss the latest ones.
    389         If no event is available, return immediately.
    390 
    391         Args:
    392             event_name: Name of the events to be popped.
    393 
    394         Returns:
    395            results: List of the desired events.
    396 
    397         Raises:
    398             IllegalStateError: Raised if pop is called before the dispatcher
    399                 starts polling.
    400         """
    401         if not self.started:
    402             raise IllegalStateError(("Dispatcher needs to be started before "
    403                                      "popping."))
    404         results = []
    405         try:
    406             self.lock.acquire()
    407             while True:
    408                 e = self.event_dict[event_name].get(block=False)
    409                 results.append(e)
    410         except (queue.Empty, KeyError):
    411             return results
    412         finally:
    413             self.lock.release()
    414 
    415     def clear_events(self, event_name):
    416         """Clear all events of a particular name.
    417 
    418         Args:
    419             event_name: Name of the events to be popped.
    420         """
    421         self.lock.acquire()
    422         try:
    423             q = self.get_event_q(event_name)
    424             q.queue.clear()
    425         except queue.Empty:
    426             return
    427         finally:
    428             self.lock.release()
    429 
    430     def clear_all_events(self):
    431         """Clear all event queues and their cached events."""
    432         self.lock.acquire()
    433         self.event_dict.clear()
    434         self.lock.release()
    435