Home | History | Annotate | Download | only in _channel
      1 # Copyright 2017 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 import logging
     16 import threading
     17 
     18 import grpc
     19 
     20 _NOT_YET_OBSERVED = object()
     21 logging.basicConfig()
     22 _LOGGER = logging.getLogger(__name__)
     23 
     24 
     25 def _cancel(handler):
     26     return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
     27 
     28 
     29 def _is_active(handler):
     30     return handler.is_active()
     31 
     32 
     33 def _time_remaining(unused_handler):
     34     raise NotImplementedError()
     35 
     36 
     37 def _add_callback(handler, callback):
     38     return handler.add_callback(callback)
     39 
     40 
     41 def _initial_metadata(handler):
     42     return handler.initial_metadata()
     43 
     44 
     45 def _trailing_metadata(handler):
     46     trailing_metadata, unused_code, unused_details = handler.termination()
     47     return trailing_metadata
     48 
     49 
     50 def _code(handler):
     51     unused_trailing_metadata, code, unused_details = handler.termination()
     52     return code
     53 
     54 
     55 def _details(handler):
     56     unused_trailing_metadata, unused_code, details = handler.termination()
     57     return details
     58 
     59 
     60 class _Call(grpc.Call):
     61 
     62     def __init__(self, handler):
     63         self._handler = handler
     64 
     65     def cancel(self):
     66         _cancel(self._handler)
     67 
     68     def is_active(self):
     69         return _is_active(self._handler)
     70 
     71     def time_remaining(self):
     72         return _time_remaining(self._handler)
     73 
     74     def add_callback(self, callback):
     75         return _add_callback(self._handler, callback)
     76 
     77     def initial_metadata(self):
     78         return _initial_metadata(self._handler)
     79 
     80     def trailing_metadata(self):
     81         return _trailing_metadata(self._handler)
     82 
     83     def code(self):
     84         return _code(self._handler)
     85 
     86     def details(self):
     87         return _details(self._handler)
     88 
     89 
     90 class _RpcErrorCall(grpc.RpcError, grpc.Call):
     91 
     92     def __init__(self, handler):
     93         self._handler = handler
     94 
     95     def cancel(self):
     96         _cancel(self._handler)
     97 
     98     def is_active(self):
     99         return _is_active(self._handler)
    100 
    101     def time_remaining(self):
    102         return _time_remaining(self._handler)
    103 
    104     def add_callback(self, callback):
    105         return _add_callback(self._handler, callback)
    106 
    107     def initial_metadata(self):
    108         return _initial_metadata(self._handler)
    109 
    110     def trailing_metadata(self):
    111         return _trailing_metadata(self._handler)
    112 
    113     def code(self):
    114         return _code(self._handler)
    115 
    116     def details(self):
    117         return _details(self._handler)
    118 
    119 
    120 def _next(handler):
    121     read = handler.take_response()
    122     if read.code is None:
    123         return read.response
    124     elif read.code is grpc.StatusCode.OK:
    125         raise StopIteration()
    126     else:
    127         raise _RpcErrorCall(handler)
    128 
    129 
    130 class _HandlerExtras(object):
    131 
    132     def __init__(self):
    133         self.condition = threading.Condition()
    134         self.unary_response = _NOT_YET_OBSERVED
    135         self.cancelled = False
    136 
    137 
    138 def _with_extras_cancel(handler, extras):
    139     with extras.condition:
    140         if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
    141             extras.cancelled = True
    142             return True
    143         else:
    144             return False
    145 
    146 
    147 def _extras_without_cancelled(extras):
    148     with extras.condition:
    149         return extras.cancelled
    150 
    151 
    152 def _running(handler):
    153     return handler.is_active()
    154 
    155 
    156 def _done(handler):
    157     return not handler.is_active()
    158 
    159 
    160 def _with_extras_unary_response(handler, extras):
    161     with extras.condition:
    162         if extras.unary_response is _NOT_YET_OBSERVED:
    163             read = handler.take_response()
    164             if read.code is None:
    165                 extras.unary_response = read.response
    166                 return read.response
    167             else:
    168                 raise _RpcErrorCall(handler)
    169         else:
    170             return extras.unary_response
    171 
    172 
    173 def _exception(unused_handler):
    174     raise NotImplementedError('TODO!')
    175 
    176 
    177 def _traceback(unused_handler):
    178     raise NotImplementedError('TODO!')
    179 
    180 
    181 def _add_done_callback(handler, callback, future):
    182     adapted_callback = lambda: callback(future)
    183     if not handler.add_callback(adapted_callback):
    184         callback(future)
    185 
    186 
    187 class _FutureCall(grpc.Future, grpc.Call):
    188 
    189     def __init__(self, handler, extras):
    190         self._handler = handler
    191         self._extras = extras
    192 
    193     def cancel(self):
    194         return _with_extras_cancel(self._handler, self._extras)
    195 
    196     def cancelled(self):
    197         return _extras_without_cancelled(self._extras)
    198 
    199     def running(self):
    200         return _running(self._handler)
    201 
    202     def done(self):
    203         return _done(self._handler)
    204 
    205     def result(self):
    206         return _with_extras_unary_response(self._handler, self._extras)
    207 
    208     def exception(self):
    209         return _exception(self._handler)
    210 
    211     def traceback(self):
    212         return _traceback(self._handler)
    213 
    214     def add_done_callback(self, fn):
    215         _add_done_callback(self._handler, fn, self)
    216 
    217     def is_active(self):
    218         return _is_active(self._handler)
    219 
    220     def time_remaining(self):
    221         return _time_remaining(self._handler)
    222 
    223     def add_callback(self, callback):
    224         return _add_callback(self._handler, callback)
    225 
    226     def initial_metadata(self):
    227         return _initial_metadata(self._handler)
    228 
    229     def trailing_metadata(self):
    230         return _trailing_metadata(self._handler)
    231 
    232     def code(self):
    233         return _code(self._handler)
    234 
    235     def details(self):
    236         return _details(self._handler)
    237 
    238 
    239 def consume_requests(request_iterator, handler):
    240 
    241     def _consume():
    242         while True:
    243             try:
    244                 request = next(request_iterator)
    245                 added = handler.add_request(request)
    246                 if not added:
    247                     break
    248             except StopIteration:
    249                 handler.close_requests()
    250                 break
    251             except Exception:  # pylint: disable=broad-except
    252                 details = 'Exception iterating requests!'
    253                 _LOGGER.exception(details)
    254                 handler.cancel(grpc.StatusCode.UNKNOWN, details)
    255 
    256     consumption = threading.Thread(target=_consume)
    257     consumption.start()
    258 
    259 
    260 def blocking_unary_response(handler):
    261     read = handler.take_response()
    262     if read.code is None:
    263         unused_trailing_metadata, code, unused_details = handler.termination()
    264         if code is grpc.StatusCode.OK:
    265             return read.response
    266         else:
    267             raise _RpcErrorCall(handler)
    268     else:
    269         raise _RpcErrorCall(handler)
    270 
    271 
    272 def blocking_unary_response_with_call(handler):
    273     read = handler.take_response()
    274     if read.code is None:
    275         unused_trailing_metadata, code, unused_details = handler.termination()
    276         if code is grpc.StatusCode.OK:
    277             return read.response, _Call(handler)
    278         else:
    279             raise _RpcErrorCall(handler)
    280     else:
    281         raise _RpcErrorCall(handler)
    282 
    283 
    284 def future_call(handler):
    285     return _FutureCall(handler, _HandlerExtras())
    286 
    287 
    288 class ResponseIteratorCall(grpc.Call):
    289 
    290     def __init__(self, handler):
    291         self._handler = handler
    292 
    293     def __iter__(self):
    294         return self
    295 
    296     def __next__(self):
    297         return _next(self._handler)
    298 
    299     def next(self):
    300         return _next(self._handler)
    301 
    302     def cancel(self):
    303         _cancel(self._handler)
    304 
    305     def is_active(self):
    306         return _is_active(self._handler)
    307 
    308     def time_remaining(self):
    309         return _time_remaining(self._handler)
    310 
    311     def add_callback(self, callback):
    312         return _add_callback(self._handler, callback)
    313 
    314     def initial_metadata(self):
    315         return _initial_metadata(self._handler)
    316 
    317     def trailing_metadata(self):
    318         return _trailing_metadata(self._handler)
    319 
    320     def code(self):
    321         return _code(self._handler)
    322 
    323     def details(self):
    324         return _details(self._handler)
    325