Home | History | Annotate | Download | only in _cygrpc
      1 # Copyright 2015 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 cimport cpython
     16 
     17 import threading
     18 import time
     19 
     20 _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
     21     'Internal gRPC call error %d. ' +
     22     'Please report to https://github.com/grpc/grpc/issues')
     23 
     24 
     25 cdef str _call_error_metadata(metadata):
     26   return 'metadata was invalid: %s' % metadata
     27 
     28 
     29 cdef str _call_error_no_metadata(c_call_error):
     30   return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
     31 
     32 
     33 cdef str _call_error(c_call_error, metadata):
     34   if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
     35     return _call_error_metadata(metadata)
     36   else:
     37     return _call_error_no_metadata(c_call_error)
     38 
     39 
     40 cdef _check_call_error_no_metadata(c_call_error):
     41   if c_call_error != GRPC_CALL_OK:
     42     return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
     43   else:
     44     return None
     45 
     46 
     47 cdef _check_and_raise_call_error_no_metadata(c_call_error):
     48   error = _check_call_error_no_metadata(c_call_error)
     49   if error is not None:
     50     raise ValueError(error)
     51 
     52 
     53 cdef _check_call_error(c_call_error, metadata):
     54   if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
     55     return _call_error_metadata(metadata)
     56   else:
     57     return _check_call_error_no_metadata(c_call_error)
     58 
     59 
     60 cdef void _raise_call_error_no_metadata(c_call_error) except *:
     61   raise ValueError(_call_error_no_metadata(c_call_error))
     62 
     63 
     64 cdef void _raise_call_error(c_call_error, metadata) except *:
     65   raise ValueError(_call_error(c_call_error, metadata))
     66 
     67 
     68 cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
     69   grpc_completion_queue_shutdown(c_completion_queue)
     70   grpc_completion_queue_destroy(c_completion_queue)
     71 
     72 
     73 cdef class _CallState:
     74 
     75   def __cinit__(self):
     76     self.due = set()
     77 
     78 
     79 cdef class _ChannelState:
     80 
     81   def __cinit__(self):
     82     self.condition = threading.Condition()
     83     self.open = True
     84     self.integrated_call_states = {}
     85     self.segregated_call_states = set()
     86     self.connectivity_due = set()
     87     self.closed_reason = None
     88 
     89 
     90 cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
     91   cdef grpc_call_error c_call_error
     92   cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
     93   tag.prepare()
     94   cpython.Py_INCREF(tag)
     95   with nogil:
     96     c_call_error = grpc_call_start_batch(
     97         c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
     98   return c_call_error, tag
     99 
    100 
    101 cdef object _operate_from_integrated_call(
    102     _ChannelState channel_state, _CallState call_state, object operations,
    103     object user_tag):
    104   cdef grpc_call_error c_call_error
    105   cdef _BatchOperationTag tag
    106   with channel_state.condition:
    107     if call_state.due:
    108       c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
    109       if c_call_error == GRPC_CALL_OK:
    110         call_state.due.add(tag)
    111         channel_state.integrated_call_states[tag] = call_state
    112         return True
    113       else:
    114         _raise_call_error_no_metadata(c_call_error)
    115     else:
    116       return False
    117 
    118 
    119 cdef object _operate_from_segregated_call(
    120     _ChannelState channel_state, _CallState call_state, object operations,
    121     object user_tag):
    122   cdef grpc_call_error c_call_error
    123   cdef _BatchOperationTag tag
    124   with channel_state.condition:
    125     if call_state.due:
    126       c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
    127       if c_call_error == GRPC_CALL_OK:
    128         call_state.due.add(tag)
    129         return True
    130       else:
    131         _raise_call_error_no_metadata(c_call_error)
    132     else:
    133       return False
    134 
    135 
    136 cdef _cancel(
    137     _ChannelState channel_state, _CallState call_state, grpc_status_code code,
    138     str details):
    139   cdef grpc_call_error c_call_error
    140   with channel_state.condition:
    141     if call_state.due:
    142       c_call_error = grpc_call_cancel_with_status(
    143           call_state.c_call, code, _encode(details), NULL)
    144       _check_and_raise_call_error_no_metadata(c_call_error)
    145 
    146 
    147 cdef _next_call_event(
    148     _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
    149     on_success, deadline):
    150   tag, event = _latent_event(c_completion_queue, deadline)
    151   with channel_state.condition:
    152     on_success(tag)
    153     channel_state.condition.notify_all()
    154   return event
    155 
    156 
    157 # TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
    158 cdef void _call(
    159     _ChannelState channel_state, _CallState call_state,
    160     grpc_completion_queue *c_completion_queue, on_success, int flags, method,
    161     host, object deadline, CallCredentials credentials,
    162     object operationses_and_user_tags, object metadata) except *:
    163   """Invokes an RPC.
    164 
    165   Args:
    166     channel_state: A _ChannelState with its "open" attribute set to True. RPCs
    167       may not be invoked on a closed channel.
    168     call_state: An empty _CallState to be altered (specifically assigned a
    169       c_call and having its due set populated) if the RPC invocation is
    170       successful.
    171     c_completion_queue: A grpc_completion_queue to be used for the call's
    172       operations.
    173     on_success: A behavior to be called if attempting to start operations for
    174       the call succeeds. If called the behavior will be called while holding the
    175       channel_state condition and passed the tags associated with operations
    176       that were successfully started for the call.
    177     flags: Flags to be passed to gRPC Core as part of call creation.
    178     method: The fully-qualified name of the RPC method being invoked.
    179     host: A "host" string to be passed to gRPC Core as part of call creation.
    180     deadline: A float for the deadline of the RPC, or None if the RPC is to have
    181       no deadline.
    182     credentials: A _CallCredentials for the RPC or None.
    183     operationses_and_user_tags: A sequence of length-two sequences the first
    184       element of which is a sequence of Operations and the second element of
    185       which is an object to be used as a tag. A SendInitialMetadataOperation
    186       must be present in the first element of this value.
    187     metadata: The metadata for this call.
    188   """
    189   cdef grpc_slice method_slice
    190   cdef grpc_slice host_slice
    191   cdef grpc_slice *host_slice_ptr
    192   cdef grpc_call_credentials *c_call_credentials
    193   cdef grpc_call_error c_call_error
    194   cdef tuple error_and_wrapper_tag
    195   cdef _BatchOperationTag wrapper_tag
    196   with channel_state.condition:
    197     if channel_state.open:
    198       method_slice = _slice_from_bytes(method)
    199       if host is None:
    200         host_slice_ptr = NULL
    201       else:
    202         host_slice = _slice_from_bytes(host)
    203         host_slice_ptr = &host_slice
    204       call_state.c_call = grpc_channel_create_call(
    205           channel_state.c_channel, NULL, flags,
    206           c_completion_queue, method_slice, host_slice_ptr,
    207           _timespec_from_time(deadline), NULL)
    208       grpc_slice_unref(method_slice)
    209       if host_slice_ptr:
    210         grpc_slice_unref(host_slice)
    211       if credentials is not None:
    212         c_call_credentials = credentials.c()
    213         c_call_error = grpc_call_set_credentials(
    214             call_state.c_call, c_call_credentials)
    215         grpc_call_credentials_release(c_call_credentials)
    216         if c_call_error != GRPC_CALL_OK:
    217           grpc_call_unref(call_state.c_call)
    218           call_state.c_call = NULL
    219           _raise_call_error_no_metadata(c_call_error)
    220       started_tags = set()
    221       for operations, user_tag in operationses_and_user_tags:
    222         c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
    223         if c_call_error == GRPC_CALL_OK:
    224           started_tags.add(tag)
    225         else:
    226           grpc_call_cancel(call_state.c_call, NULL)
    227           grpc_call_unref(call_state.c_call)
    228           call_state.c_call = NULL
    229           _raise_call_error(c_call_error, metadata)
    230       else:
    231         call_state.due.update(started_tags)
    232         on_success(started_tags)
    233     else:
    234       raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
    235 cdef void _process_integrated_call_tag(
    236     _ChannelState state, _BatchOperationTag tag) except *:
    237   cdef _CallState call_state = state.integrated_call_states.pop(tag)
    238   call_state.due.remove(tag)
    239   if not call_state.due:
    240     grpc_call_unref(call_state.c_call)
    241     call_state.c_call = NULL
    242 
    243 
    244 cdef class IntegratedCall:
    245 
    246   def __cinit__(self, _ChannelState channel_state, _CallState call_state):
    247     self._channel_state = channel_state
    248     self._call_state = call_state
    249 
    250   def operate(self, operations, tag):
    251     return _operate_from_integrated_call(
    252         self._channel_state, self._call_state, operations, tag)
    253 
    254   def cancel(self, code, details):
    255     _cancel(self._channel_state, self._call_state, code, details)
    256 
    257 
    258 cdef IntegratedCall _integrated_call(
    259     _ChannelState state, int flags, method, host, object deadline,
    260     object metadata, CallCredentials credentials, operationses_and_user_tags):
    261   call_state = _CallState()
    262 
    263   def on_success(started_tags):
    264     for started_tag in started_tags:
    265       state.integrated_call_states[started_tag] = call_state
    266 
    267   _call(
    268       state, call_state, state.c_call_completion_queue, on_success, flags,
    269       method, host, deadline, credentials, operationses_and_user_tags, metadata)
    270 
    271   return IntegratedCall(state, call_state)
    272 
    273 
    274 cdef object _process_segregated_call_tag(
    275     _ChannelState state, _CallState call_state,
    276     grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
    277   call_state.due.remove(tag)
    278   if not call_state.due:
    279     grpc_call_unref(call_state.c_call)
    280     call_state.c_call = NULL
    281     state.segregated_call_states.remove(call_state)
    282     _destroy_c_completion_queue(c_completion_queue)
    283     return True
    284   else:
    285     return False
    286 
    287 
    288 cdef class SegregatedCall:
    289 
    290   def __cinit__(self, _ChannelState channel_state, _CallState call_state):
    291     self._channel_state = channel_state
    292     self._call_state = call_state
    293 
    294   def operate(self, operations, tag):
    295     return _operate_from_segregated_call(
    296         self._channel_state, self._call_state, operations, tag)
    297 
    298   def cancel(self, code, details):
    299     _cancel(self._channel_state, self._call_state, code, details)
    300 
    301   def next_event(self):
    302     def on_success(tag):
    303       _process_segregated_call_tag(
    304         self._channel_state, self._call_state, self._c_completion_queue, tag)
    305     return _next_call_event(
    306         self._channel_state, self._c_completion_queue, on_success, None)
    307 
    308 
    309 cdef SegregatedCall _segregated_call(
    310     _ChannelState state, int flags, method, host, object deadline,
    311     object metadata, CallCredentials credentials, operationses_and_user_tags):
    312   cdef _CallState call_state = _CallState()
    313   cdef SegregatedCall segregated_call
    314   cdef grpc_completion_queue *c_completion_queue
    315 
    316   def on_success(started_tags):
    317     state.segregated_call_states.add(call_state)
    318 
    319   with state.condition:
    320     if state.open:
    321       c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
    322     else:
    323       raise ValueError('Cannot invoke RPC on closed channel!')
    324 
    325   try:
    326     _call(
    327         state, call_state, c_completion_queue, on_success, flags, method, host,
    328         deadline, credentials, operationses_and_user_tags, metadata)
    329   except:
    330     _destroy_c_completion_queue(c_completion_queue)
    331     raise
    332 
    333   segregated_call = SegregatedCall(state, call_state)
    334   segregated_call._c_completion_queue = c_completion_queue
    335   return segregated_call
    336 
    337 
    338 cdef object _watch_connectivity_state(
    339     _ChannelState state, grpc_connectivity_state last_observed_state,
    340     object deadline):
    341   cdef _ConnectivityTag tag = _ConnectivityTag(object())
    342   with state.condition:
    343     if state.open:
    344       cpython.Py_INCREF(tag)
    345       grpc_channel_watch_connectivity_state(
    346           state.c_channel, last_observed_state, _timespec_from_time(deadline),
    347           state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
    348       state.connectivity_due.add(tag)
    349     else:
    350       raise ValueError('Cannot invoke RPC: %s' % state.closed_reason)
    351   completed_tag, event = _latent_event(
    352       state.c_connectivity_completion_queue, None)
    353   with state.condition:
    354     state.connectivity_due.remove(completed_tag)
    355     state.condition.notify_all()
    356   return event
    357 
    358 
    359 cdef _close(Channel channel, grpc_status_code code, object details,
    360     drain_calls):
    361   cdef _ChannelState state = channel._state
    362   cdef _CallState call_state
    363   encoded_details = _encode(details)
    364   with state.condition:
    365     if state.open:
    366       state.open = False
    367       state.closed_reason = details
    368       for call_state in set(state.integrated_call_states.values()):
    369         grpc_call_cancel_with_status(
    370             call_state.c_call, code, encoded_details, NULL)
    371       for call_state in state.segregated_call_states:
    372         grpc_call_cancel_with_status(
    373             call_state.c_call, code, encoded_details, NULL)
    374       # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
    375       # watching.
    376 
    377       if drain_calls:
    378         while not _calls_drained(state):
    379           event = channel.next_call_event()
    380           if event.completion_type == CompletionType.queue_timeout:
    381               continue  
    382           event.tag(event)
    383       else:
    384         while state.integrated_call_states:
    385           state.condition.wait()
    386         while state.segregated_call_states:
    387           state.condition.wait()
    388         while state.connectivity_due:
    389           state.condition.wait()
    390 
    391       _destroy_c_completion_queue(state.c_call_completion_queue)
    392       _destroy_c_completion_queue(state.c_connectivity_completion_queue)
    393       grpc_channel_destroy(state.c_channel)
    394       state.c_channel = NULL
    395       grpc_shutdown()
    396       state.condition.notify_all()
    397     else:
    398       # Another call to close already completed in the past or is currently
    399       # being executed in another thread.
    400       while state.c_channel != NULL:
    401         state.condition.wait()
    402 
    403 
    404 cdef _calls_drained(_ChannelState state):
    405   return not (state.integrated_call_states or state.segregated_call_states or
    406               state.connectivity_due)
    407 
    408 cdef class Channel:
    409 
    410   def __cinit__(
    411       self, bytes target, object arguments,
    412       ChannelCredentials channel_credentials):
    413     arguments = () if arguments is None else tuple(arguments)
    414     fork_handlers_and_grpc_init()
    415     self._state = _ChannelState()
    416     self._vtable.copy = &_copy_pointer
    417     self._vtable.destroy = &_destroy_pointer
    418     self._vtable.cmp = &_compare_pointer
    419     cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor(
    420         arguments)
    421     cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable)
    422     if channel_credentials is None:
    423       self._state.c_channel = grpc_insecure_channel_create(
    424           <char *>target, c_arguments, NULL)
    425     else:
    426       c_channel_credentials = channel_credentials.c()
    427       self._state.c_channel = grpc_secure_channel_create(
    428           c_channel_credentials, <char *>target, c_arguments, NULL)
    429       grpc_channel_credentials_release(c_channel_credentials)
    430     self._state.c_call_completion_queue = (
    431         grpc_completion_queue_create_for_next(NULL))
    432     self._state.c_connectivity_completion_queue = (
    433         grpc_completion_queue_create_for_next(NULL))
    434     self._arguments = arguments
    435 
    436   def target(self):
    437     cdef char *c_target
    438     with self._state.condition:
    439       c_target = grpc_channel_get_target(self._state.c_channel)
    440       target = <bytes>c_target
    441       gpr_free(c_target)
    442       return target
    443 
    444   def integrated_call(
    445       self, int flags, method, host, object deadline, object metadata,
    446       CallCredentials credentials, operationses_and_tags):
    447     return _integrated_call(
    448         self._state, flags, method, host, deadline, metadata, credentials,
    449         operationses_and_tags)
    450 
    451   def next_call_event(self):
    452     def on_success(tag):
    453       if tag is not None:
    454         _process_integrated_call_tag(self._state, tag)
    455     if is_fork_support_enabled():
    456       queue_deadline = time.time() + 1.0
    457     else:
    458       queue_deadline = None
    459     return _next_call_event(self._state, self._state.c_call_completion_queue,
    460                             on_success, queue_deadline)
    461 
    462   def segregated_call(
    463       self, int flags, method, host, object deadline, object metadata,
    464       CallCredentials credentials, operationses_and_tags):
    465     return _segregated_call(
    466         self._state, flags, method, host, deadline, metadata, credentials,
    467         operationses_and_tags)
    468 
    469   def check_connectivity_state(self, bint try_to_connect):
    470     with self._state.condition:
    471       if self._state.open:
    472         return grpc_channel_check_connectivity_state(
    473             self._state.c_channel, try_to_connect)
    474       else:
    475         raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
    476 
    477   def watch_connectivity_state(
    478       self, grpc_connectivity_state last_observed_state, object deadline):
    479     return _watch_connectivity_state(self._state, last_observed_state, deadline)
    480 
    481   def close(self, code, details):
    482     _close(self, code, details, False)
    483 
    484   def close_on_fork(self, code, details):
    485     _close(self, code, details, True)
    486