Home | History | Annotate | Download | only in foundation
      1 # Copyright 2015 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 """Helpful utilities related to the stream module."""
     15 
     16 import logging
     17 import threading
     18 
     19 from grpc.framework.foundation import stream
     20 
     21 _NO_VALUE = object()
     22 logging.basicConfig()
     23 _LOGGER = logging.getLogger(__name__)
     24 
     25 
     26 class TransformingConsumer(stream.Consumer):
     27     """A stream.Consumer that passes a transformation of its input to another."""
     28 
     29     def __init__(self, transformation, downstream):
     30         self._transformation = transformation
     31         self._downstream = downstream
     32 
     33     def consume(self, value):
     34         self._downstream.consume(self._transformation(value))
     35 
     36     def terminate(self):
     37         self._downstream.terminate()
     38 
     39     def consume_and_terminate(self, value):
     40         self._downstream.consume_and_terminate(self._transformation(value))
     41 
     42 
     43 class IterableConsumer(stream.Consumer):
     44     """A Consumer that when iterated over emits the values it has consumed."""
     45 
     46     def __init__(self):
     47         self._condition = threading.Condition()
     48         self._values = []
     49         self._active = True
     50 
     51     def consume(self, value):
     52         with self._condition:
     53             if self._active:
     54                 self._values.append(value)
     55                 self._condition.notify()
     56 
     57     def terminate(self):
     58         with self._condition:
     59             self._active = False
     60             self._condition.notify()
     61 
     62     def consume_and_terminate(self, value):
     63         with self._condition:
     64             if self._active:
     65                 self._values.append(value)
     66                 self._active = False
     67                 self._condition.notify()
     68 
     69     def __iter__(self):
     70         return self
     71 
     72     def __next__(self):
     73         return self.next()
     74 
     75     def next(self):
     76         with self._condition:
     77             while self._active and not self._values:
     78                 self._condition.wait()
     79             if self._values:
     80                 return self._values.pop(0)
     81             else:
     82                 raise StopIteration()
     83 
     84 
     85 class ThreadSwitchingConsumer(stream.Consumer):
     86     """A Consumer decorator that affords serialization and asynchrony."""
     87 
     88     def __init__(self, sink, pool):
     89         self._lock = threading.Lock()
     90         self._sink = sink
     91         self._pool = pool
     92         # True if self._spin has been submitted to the pool to be called once and
     93         # that call has not yet returned, False otherwise.
     94         self._spinning = False
     95         self._values = []
     96         self._active = True
     97 
     98     def _spin(self, sink, value, terminate):
     99         while True:
    100             try:
    101                 if value is _NO_VALUE:
    102                     sink.terminate()
    103                 elif terminate:
    104                     sink.consume_and_terminate(value)
    105                 else:
    106                     sink.consume(value)
    107             except Exception as e:  # pylint:disable=broad-except
    108                 _LOGGER.exception(e)
    109 
    110             with self._lock:
    111                 if terminate:
    112                     self._spinning = False
    113                     return
    114                 elif self._values:
    115                     value = self._values.pop(0)
    116                     terminate = not self._values and not self._active
    117                 elif not self._active:
    118                     value = _NO_VALUE
    119                     terminate = True
    120                 else:
    121                     self._spinning = False
    122                     return
    123 
    124     def consume(self, value):
    125         with self._lock:
    126             if self._active:
    127                 if self._spinning:
    128                     self._values.append(value)
    129                 else:
    130                     self._pool.submit(self._spin, self._sink, value, False)
    131                     self._spinning = True
    132 
    133     def terminate(self):
    134         with self._lock:
    135             if self._active:
    136                 self._active = False
    137                 if not self._spinning:
    138                     self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
    139                     self._spinning = True
    140 
    141     def consume_and_terminate(self, value):
    142         with self._lock:
    143             if self._active:
    144                 self._active = False
    145                 if self._spinning:
    146                     self._values.append(value)
    147                 else:
    148                     self._pool.submit(self._spin, self._sink, value, True)
    149                     self._spinning = True
    150