Home | History | Annotate | Download | only in _server
      1 # Copyright 2017 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 import grpc
     16 
     17 
     18 class _RequestIterator(object):
     19 
     20     def __init__(self, rpc, handler):
     21         self._rpc = rpc
     22         self._handler = handler
     23 
     24     def _next(self):
     25         read = self._handler.take_request()
     26         if read.requests_closed:
     27             raise StopIteration()
     28         elif read.terminated:
     29             rpc_error = grpc.RpcError()
     30             self._rpc.add_rpc_error(rpc_error)
     31             raise rpc_error
     32         else:
     33             return read.request
     34 
     35     def __iter__(self):
     36         return self
     37 
     38     def __next__(self):
     39         return self._next()
     40 
     41     def next(self):
     42         return self._next()
     43 
     44 
     45 def _unary_response(argument, implementation, rpc, servicer_context):
     46     try:
     47         response = implementation(argument, servicer_context)
     48     except Exception as exception:  # pylint: disable=broad-except
     49         rpc.application_exception_abort(exception)
     50     else:
     51         rpc.unary_response_complete(response)
     52 
     53 
     54 def _stream_response(argument, implementation, rpc, servicer_context):
     55     try:
     56         response_iterator = implementation(argument, servicer_context)
     57     except Exception as exception:  # pylint: disable=broad-except
     58         rpc.application_exception_abort(exception)
     59     else:
     60         while True:
     61             try:
     62                 response = next(response_iterator)
     63             except StopIteration:
     64                 rpc.stream_response_complete()
     65                 break
     66             except Exception as exception:  # pylint: disable=broad-except
     67                 rpc.application_exception_abort(exception)
     68                 break
     69             else:
     70                 rpc.stream_response(response)
     71 
     72 
     73 def unary_unary(implementation, rpc, request, servicer_context):
     74     _unary_response(request, implementation, rpc, servicer_context)
     75 
     76 
     77 def unary_stream(implementation, rpc, request, servicer_context):
     78     _stream_response(request, implementation, rpc, servicer_context)
     79 
     80 
     81 def stream_unary(implementation, rpc, handler, servicer_context):
     82     _unary_response(
     83         _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
     84 
     85 
     86 def stream_stream(implementation, rpc, handler, servicer_context):
     87     _stream_response(
     88         _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
     89