Home | History | Annotate | Download | only in grpc_testing
      1 # Copyright 2017 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 """Test times."""
     15 
     16 import collections
     17 import logging
     18 import threading
     19 import time as _time
     20 
     21 import grpc
     22 import grpc_testing
     23 
     24 logging.basicConfig()
     25 _LOGGER = logging.getLogger(__name__)
     26 
     27 
     28 def _call(behaviors):
     29     for behavior in behaviors:
     30         try:
     31             behavior()
     32         except Exception:  # pylint: disable=broad-except
     33             _LOGGER.exception('Exception calling behavior "%r"!', behavior)
     34 
     35 
     36 def _call_in_thread(behaviors):
     37     calling = threading.Thread(target=_call, args=(behaviors,))
     38     calling.start()
     39     # NOTE(nathaniel): Because this function is called from "strict" Time
     40     # implementations, it blocks until after all behaviors have terminated.
     41     calling.join()
     42 
     43 
     44 class _State(object):
     45 
     46     def __init__(self):
     47         self.condition = threading.Condition()
     48         self.times_to_behaviors = collections.defaultdict(list)
     49 
     50 
     51 class _Delta(
     52         collections.namedtuple('_Delta', (
     53             'mature_behaviors',
     54             'earliest_mature_time',
     55             'earliest_immature_time',
     56         ))):
     57     pass
     58 
     59 
     60 def _process(state, now):
     61     mature_behaviors = []
     62     earliest_mature_time = None
     63     while state.times_to_behaviors:
     64         earliest_time = min(state.times_to_behaviors)
     65         if earliest_time <= now:
     66             if earliest_mature_time is None:
     67                 earliest_mature_time = earliest_time
     68             earliest_mature_behaviors = state.times_to_behaviors.pop(
     69                 earliest_time)
     70             mature_behaviors.extend(earliest_mature_behaviors)
     71         else:
     72             earliest_immature_time = earliest_time
     73             break
     74     else:
     75         earliest_immature_time = None
     76     return _Delta(mature_behaviors, earliest_mature_time,
     77                   earliest_immature_time)
     78 
     79 
     80 class _Future(grpc.Future):
     81 
     82     def __init__(self, state, behavior, time):
     83         self._state = state
     84         self._behavior = behavior
     85         self._time = time
     86         self._cancelled = False
     87 
     88     def cancel(self):
     89         with self._state.condition:
     90             if self._cancelled:
     91                 return True
     92             else:
     93                 behaviors_at_time = self._state.times_to_behaviors.get(
     94                     self._time)
     95                 if behaviors_at_time is None:
     96                     return False
     97                 else:
     98                     behaviors_at_time.remove(self._behavior)
     99                     if not behaviors_at_time:
    100                         self._state.times_to_behaviors.pop(self._time)
    101                         self._state.condition.notify_all()
    102                     self._cancelled = True
    103                     return True
    104 
    105     def cancelled(self):
    106         with self._state.condition:
    107             return self._cancelled
    108 
    109     def running(self):
    110         raise NotImplementedError()
    111 
    112     def done(self):
    113         raise NotImplementedError()
    114 
    115     def result(self, timeout=None):
    116         raise NotImplementedError()
    117 
    118     def exception(self, timeout=None):
    119         raise NotImplementedError()
    120 
    121     def traceback(self, timeout=None):
    122         raise NotImplementedError()
    123 
    124     def add_done_callback(self, fn):
    125         raise NotImplementedError()
    126 
    127 
    128 class StrictRealTime(grpc_testing.Time):
    129 
    130     def __init__(self):
    131         self._state = _State()
    132         self._active = False
    133         self._calling = None
    134 
    135     def _activity(self):
    136         while True:
    137             with self._state.condition:
    138                 while True:
    139                     now = _time.time()
    140                     delta = _process(self._state, now)
    141                     self._state.condition.notify_all()
    142                     if delta.mature_behaviors:
    143                         self._calling = delta.earliest_mature_time
    144                         break
    145                     self._calling = None
    146                     if delta.earliest_immature_time is None:
    147                         self._active = False
    148                         return
    149                     else:
    150                         timeout = max(0, delta.earliest_immature_time - now)
    151                         self._state.condition.wait(timeout=timeout)
    152             _call(delta.mature_behaviors)
    153 
    154     def _ensure_called_through(self, time):
    155         with self._state.condition:
    156             while ((self._state.times_to_behaviors and
    157                     min(self._state.times_to_behaviors) < time) or
    158                    (self._calling is not None and self._calling < time)):
    159                 self._state.condition.wait()
    160 
    161     def _call_at(self, behavior, time):
    162         with self._state.condition:
    163             self._state.times_to_behaviors[time].append(behavior)
    164             if self._active:
    165                 self._state.condition.notify_all()
    166             else:
    167                 activity = threading.Thread(target=self._activity)
    168                 activity.start()
    169                 self._active = True
    170             return _Future(self._state, behavior, time)
    171 
    172     def time(self):
    173         return _time.time()
    174 
    175     def call_in(self, behavior, delay):
    176         return self._call_at(behavior, _time.time() + delay)
    177 
    178     def call_at(self, behavior, time):
    179         return self._call_at(behavior, time)
    180 
    181     def sleep_for(self, duration):
    182         time = _time.time() + duration
    183         _time.sleep(duration)
    184         self._ensure_called_through(time)
    185 
    186     def sleep_until(self, time):
    187         _time.sleep(max(0, time - _time.time()))
    188         self._ensure_called_through(time)
    189 
    190 
    191 class StrictFakeTime(grpc_testing.Time):
    192 
    193     def __init__(self, time):
    194         self._state = _State()
    195         self._time = time
    196 
    197     def time(self):
    198         return self._time
    199 
    200     def call_in(self, behavior, delay):
    201         if delay <= 0:
    202             _call_in_thread((behavior,))
    203         else:
    204             with self._state.condition:
    205                 time = self._time + delay
    206                 self._state.times_to_behaviors[time].append(behavior)
    207         return _Future(self._state, behavior, time)
    208 
    209     def call_at(self, behavior, time):
    210         with self._state.condition:
    211             if time <= self._time:
    212                 _call_in_thread((behavior,))
    213             else:
    214                 self._state.times_to_behaviors[time].append(behavior)
    215         return _Future(self._state, behavior, time)
    216 
    217     def sleep_for(self, duration):
    218         if 0 < duration:
    219             with self._state.condition:
    220                 self._time += duration
    221                 delta = _process(self._state, self._time)
    222                 _call_in_thread(delta.mature_behaviors)
    223 
    224     def sleep_until(self, time):
    225         with self._state.condition:
    226             if self._time < time:
    227                 self._time = time
    228                 delta = _process(self._state, self._time)
    229                 _call_in_thread(delta.mature_behaviors)
    230