Home | History | Annotate | Download | only in grpc
      1 # Copyright 2016 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 """Invocation-side implementation of gRPC Python."""
     15 
     16 import logging
     17 import sys
     18 import threading
     19 import time
     20 
     21 import grpc
     22 from grpc import _common
     23 from grpc import _grpcio_metadata
     24 from grpc._cython import cygrpc
     25 from grpc.framework.foundation import callable_util
     26 
     27 logging.basicConfig()
     28 _LOGGER = logging.getLogger(__name__)
     29 
     30 _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
     31 
     32 _EMPTY_FLAGS = 0
     33 
     34 _UNARY_UNARY_INITIAL_DUE = (
     35     cygrpc.OperationType.send_initial_metadata,
     36     cygrpc.OperationType.send_message,
     37     cygrpc.OperationType.send_close_from_client,
     38     cygrpc.OperationType.receive_initial_metadata,
     39     cygrpc.OperationType.receive_message,
     40     cygrpc.OperationType.receive_status_on_client,
     41 )
     42 _UNARY_STREAM_INITIAL_DUE = (
     43     cygrpc.OperationType.send_initial_metadata,
     44     cygrpc.OperationType.send_message,
     45     cygrpc.OperationType.send_close_from_client,
     46     cygrpc.OperationType.receive_initial_metadata,
     47     cygrpc.OperationType.receive_status_on_client,
     48 )
     49 _STREAM_UNARY_INITIAL_DUE = (
     50     cygrpc.OperationType.send_initial_metadata,
     51     cygrpc.OperationType.receive_initial_metadata,
     52     cygrpc.OperationType.receive_message,
     53     cygrpc.OperationType.receive_status_on_client,
     54 )
     55 _STREAM_STREAM_INITIAL_DUE = (
     56     cygrpc.OperationType.send_initial_metadata,
     57     cygrpc.OperationType.receive_initial_metadata,
     58     cygrpc.OperationType.receive_status_on_client,
     59 )
     60 
     61 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
     62     'Exception calling channel subscription callback!')
     63 
     64 _OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
     65                               '\tstatus = {}\n'
     66                               '\tdetails = "{}"\n'
     67                               '>')
     68 
     69 _NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
     70                                   '\tstatus = {}\n'
     71                                   '\tdetails = "{}"\n'
     72                                   '\tdebug_error_string = "{}"\n'
     73                                   '>')
     74 
     75 
     76 def _deadline(timeout):
     77     return None if timeout is None else time.time() + timeout
     78 
     79 
     80 def _unknown_code_details(unknown_cygrpc_code, details):
     81     return 'Server sent unknown code {} and details "{}"'.format(
     82         unknown_cygrpc_code, details)
     83 
     84 
     85 def _wait_once_until(condition, until):
     86     if until is None:
     87         condition.wait()
     88     else:
     89         remaining = until - time.time()
     90         if remaining < 0:
     91             raise grpc.FutureTimeoutError()
     92         else:
     93             condition.wait(timeout=remaining)
     94 
     95 
     96 class _RPCState(object):
     97 
     98     def __init__(self, due, initial_metadata, trailing_metadata, code, details):
     99         self.condition = threading.Condition()
    100         # The cygrpc.OperationType objects representing events due from the RPC's
    101         # completion queue.
    102         self.due = set(due)
    103         self.initial_metadata = initial_metadata
    104         self.response = None
    105         self.trailing_metadata = trailing_metadata
    106         self.code = code
    107         self.details = details
    108         self.debug_error_string = None
    109         # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
    110         # slightly wonky, so they have to be tracked separately from the rest of the
    111         # result of the RPC. This field tracks whether cancellation was requested
    112         # prior to termination of the RPC.
    113         self.cancelled = False
    114         self.callbacks = []
    115         self.fork_epoch = cygrpc.get_fork_epoch()
    116 
    117     def reset_postfork_child(self):
    118         self.condition = threading.Condition()
    119 
    120 
    121 def _abort(state, code, details):
    122     if state.code is None:
    123         state.code = code
    124         state.details = details
    125         if state.initial_metadata is None:
    126             state.initial_metadata = ()
    127         state.trailing_metadata = ()
    128 
    129 
    130 def _handle_event(event, state, response_deserializer):
    131     callbacks = []
    132     for batch_operation in event.batch_operations:
    133         operation_type = batch_operation.type()
    134         state.due.remove(operation_type)
    135         if operation_type == cygrpc.OperationType.receive_initial_metadata:
    136             state.initial_metadata = batch_operation.initial_metadata()
    137         elif operation_type == cygrpc.OperationType.receive_message:
    138             serialized_response = batch_operation.message()
    139             if serialized_response is not None:
    140                 response = _common.deserialize(serialized_response,
    141                                                response_deserializer)
    142                 if response is None:
    143                     details = 'Exception deserializing response!'
    144                     _abort(state, grpc.StatusCode.INTERNAL, details)
    145                 else:
    146                     state.response = response
    147         elif operation_type == cygrpc.OperationType.receive_status_on_client:
    148             state.trailing_metadata = batch_operation.trailing_metadata()
    149             if state.code is None:
    150                 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
    151                     batch_operation.code())
    152                 if code is None:
    153                     state.code = grpc.StatusCode.UNKNOWN
    154                     state.details = _unknown_code_details(
    155                         code, batch_operation.details())
    156                 else:
    157                     state.code = code
    158                     state.details = batch_operation.details()
    159                     state.debug_error_string = batch_operation.error_string()
    160             callbacks.extend(state.callbacks)
    161             state.callbacks = None
    162     return callbacks
    163 
    164 
    165 def _event_handler(state, response_deserializer):
    166 
    167     def handle_event(event):
    168         with state.condition:
    169             callbacks = _handle_event(event, state, response_deserializer)
    170             state.condition.notify_all()
    171             done = not state.due
    172         for callback in callbacks:
    173             callback()
    174         return done and state.fork_epoch >= cygrpc.get_fork_epoch()
    175 
    176     return handle_event
    177 
    178 
    179 def _consume_request_iterator(request_iterator, state, call, request_serializer,
    180                               event_handler):
    181     if cygrpc.is_fork_support_enabled():
    182         condition_wait_timeout = 1.0
    183     else:
    184         condition_wait_timeout = None
    185 
    186     def consume_request_iterator():  # pylint: disable=too-many-branches
    187         while True:
    188             return_from_user_request_generator_invoked = False
    189             try:
    190                 # The thread may die in user-code. Do not block fork for this.
    191                 cygrpc.enter_user_request_generator()
    192                 request = next(request_iterator)
    193             except StopIteration:
    194                 break
    195             except Exception:  # pylint: disable=broad-except
    196                 cygrpc.return_from_user_request_generator()
    197                 return_from_user_request_generator_invoked = True
    198                 code = grpc.StatusCode.UNKNOWN
    199                 details = 'Exception iterating requests!'
    200                 _LOGGER.exception(details)
    201                 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
    202                             details)
    203                 _abort(state, code, details)
    204                 return
    205             finally:
    206                 if not return_from_user_request_generator_invoked:
    207                     cygrpc.return_from_user_request_generator()
    208             serialized_request = _common.serialize(request, request_serializer)
    209             with state.condition:
    210                 if state.code is None and not state.cancelled:
    211                     if serialized_request is None:
    212                         code = grpc.StatusCode.INTERNAL
    213                         details = 'Exception serializing request!'
    214                         call.cancel(
    215                             _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
    216                             details)
    217                         _abort(state, code, details)
    218                         return
    219                     else:
    220                         operations = (cygrpc.SendMessageOperation(
    221                             serialized_request, _EMPTY_FLAGS),)
    222                         operating = call.operate(operations, event_handler)
    223                         if operating:
    224                             state.due.add(cygrpc.OperationType.send_message)
    225                         else:
    226                             return
    227                         while True:
    228                             state.condition.wait(condition_wait_timeout)
    229                             cygrpc.block_if_fork_in_progress(state)
    230                             if state.code is None:
    231                                 if cygrpc.OperationType.send_message not in state.due:
    232                                     break
    233                             else:
    234                                 return
    235                 else:
    236                     return
    237         with state.condition:
    238             if state.code is None:
    239                 operations = (
    240                     cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
    241                 operating = call.operate(operations, event_handler)
    242                 if operating:
    243                     state.due.add(cygrpc.OperationType.send_close_from_client)
    244 
    245     consumption_thread = cygrpc.ForkManagedThread(
    246         target=consume_request_iterator)
    247     consumption_thread.setDaemon(True)
    248     consumption_thread.start()
    249 
    250 
    251 class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
    252 
    253     def __init__(self, state, call, response_deserializer, deadline):
    254         super(_Rendezvous, self).__init__()
    255         self._state = state
    256         self._call = call
    257         self._response_deserializer = response_deserializer
    258         self._deadline = deadline
    259 
    260     def cancel(self):
    261         with self._state.condition:
    262             if self._state.code is None:
    263                 code = grpc.StatusCode.CANCELLED
    264                 details = 'Locally cancelled by application!'
    265                 self._call.cancel(
    266                     _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
    267                 self._state.cancelled = True
    268                 _abort(self._state, code, details)
    269                 self._state.condition.notify_all()
    270             return False
    271 
    272     def cancelled(self):
    273         with self._state.condition:
    274             return self._state.cancelled
    275 
    276     def running(self):
    277         with self._state.condition:
    278             return self._state.code is None
    279 
    280     def done(self):
    281         with self._state.condition:
    282             return self._state.code is not None
    283 
    284     def result(self, timeout=None):
    285         until = None if timeout is None else time.time() + timeout
    286         with self._state.condition:
    287             while True:
    288                 if self._state.code is None:
    289                     _wait_once_until(self._state.condition, until)
    290                 elif self._state.code is grpc.StatusCode.OK:
    291                     return self._state.response
    292                 elif self._state.cancelled:
    293                     raise grpc.FutureCancelledError()
    294                 else:
    295                     raise self
    296 
    297     def exception(self, timeout=None):
    298         until = None if timeout is None else time.time() + timeout
    299         with self._state.condition:
    300             while True:
    301                 if self._state.code is None:
    302                     _wait_once_until(self._state.condition, until)
    303                 elif self._state.code is grpc.StatusCode.OK:
    304                     return None
    305                 elif self._state.cancelled:
    306                     raise grpc.FutureCancelledError()
    307                 else:
    308                     return self
    309 
    310     def traceback(self, timeout=None):
    311         until = None if timeout is None else time.time() + timeout
    312         with self._state.condition:
    313             while True:
    314                 if self._state.code is None:
    315                     _wait_once_until(self._state.condition, until)
    316                 elif self._state.code is grpc.StatusCode.OK:
    317                     return None
    318                 elif self._state.cancelled:
    319                     raise grpc.FutureCancelledError()
    320                 else:
    321                     try:
    322                         raise self
    323                     except grpc.RpcError:
    324                         return sys.exc_info()[2]
    325 
    326     def add_done_callback(self, fn):
    327         with self._state.condition:
    328             if self._state.code is None:
    329                 self._state.callbacks.append(lambda: fn(self))
    330                 return
    331 
    332         fn(self)
    333 
    334     def _next(self):
    335         with self._state.condition:
    336             if self._state.code is None:
    337                 event_handler = _event_handler(self._state,
    338                                                self._response_deserializer)
    339                 operating = self._call.operate(
    340                     (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
    341                     event_handler)
    342                 if operating:
    343                     self._state.due.add(cygrpc.OperationType.receive_message)
    344             elif self._state.code is grpc.StatusCode.OK:
    345                 raise StopIteration()
    346             else:
    347                 raise self
    348             while True:
    349                 self._state.condition.wait()
    350                 if self._state.response is not None:
    351                     response = self._state.response
    352                     self._state.response = None
    353                     return response
    354                 elif cygrpc.OperationType.receive_message not in self._state.due:
    355                     if self._state.code is grpc.StatusCode.OK:
    356                         raise StopIteration()
    357                     elif self._state.code is not None:
    358                         raise self
    359 
    360     def __iter__(self):
    361         return self
    362 
    363     def __next__(self):
    364         return self._next()
    365 
    366     def next(self):
    367         return self._next()
    368 
    369     def is_active(self):
    370         with self._state.condition:
    371             return self._state.code is None
    372 
    373     def time_remaining(self):
    374         if self._deadline is None:
    375             return None
    376         else:
    377             return max(self._deadline - time.time(), 0)
    378 
    379     def add_callback(self, callback):
    380         with self._state.condition:
    381             if self._state.callbacks is None:
    382                 return False
    383             else:
    384                 self._state.callbacks.append(callback)
    385                 return True
    386 
    387     def initial_metadata(self):
    388         with self._state.condition:
    389             while self._state.initial_metadata is None:
    390                 self._state.condition.wait()
    391             return self._state.initial_metadata
    392 
    393     def trailing_metadata(self):
    394         with self._state.condition:
    395             while self._state.trailing_metadata is None:
    396                 self._state.condition.wait()
    397             return self._state.trailing_metadata
    398 
    399     def code(self):
    400         with self._state.condition:
    401             while self._state.code is None:
    402                 self._state.condition.wait()
    403             return self._state.code
    404 
    405     def details(self):
    406         with self._state.condition:
    407             while self._state.details is None:
    408                 self._state.condition.wait()
    409             return _common.decode(self._state.details)
    410 
    411     def debug_error_string(self):
    412         with self._state.condition:
    413             while self._state.debug_error_string is None:
    414                 self._state.condition.wait()
    415             return _common.decode(self._state.debug_error_string)
    416 
    417     def _repr(self):
    418         with self._state.condition:
    419             if self._state.code is None:
    420                 return '<_Rendezvous object of in-flight RPC>'
    421             elif self._state.code is grpc.StatusCode.OK:
    422                 return _OK_RENDEZVOUS_REPR_FORMAT.format(
    423                     self._state.code, self._state.details)
    424             else:
    425                 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
    426                     self._state.code, self._state.details,
    427                     self._state.debug_error_string)
    428 
    429     def __repr__(self):
    430         return self._repr()
    431 
    432     def __str__(self):
    433         return self._repr()
    434 
    435     def __del__(self):
    436         with self._state.condition:
    437             if self._state.code is None:
    438                 self._state.code = grpc.StatusCode.CANCELLED
    439                 self._state.details = 'Cancelled upon garbage collection!'
    440                 self._state.cancelled = True
    441                 self._call.cancel(
    442                     _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
    443                     self._state.details)
    444                 self._state.condition.notify_all()
    445 
    446 
    447 def _start_unary_request(request, timeout, request_serializer):
    448     deadline = _deadline(timeout)
    449     serialized_request = _common.serialize(request, request_serializer)
    450     if serialized_request is None:
    451         state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
    452                           'Exception serializing request!')
    453         rendezvous = _Rendezvous(state, None, None, deadline)
    454         return deadline, None, rendezvous
    455     else:
    456         return deadline, serialized_request, None
    457 
    458 
    459 def _end_unary_response_blocking(state, call, with_call, deadline):
    460     if state.code is grpc.StatusCode.OK:
    461         if with_call:
    462             rendezvous = _Rendezvous(state, call, None, deadline)
    463             return state.response, rendezvous
    464         else:
    465             return state.response
    466     else:
    467         raise _Rendezvous(state, None, None, deadline)
    468 
    469 
    470 def _stream_unary_invocation_operationses(metadata):
    471     return (
    472         (
    473             cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
    474             cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
    475             cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
    476         ),
    477         (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
    478     )
    479 
    480 
    481 def _stream_unary_invocation_operationses_and_tags(metadata):
    482     return tuple((
    483         operations,
    484         None,
    485     ) for operations in _stream_unary_invocation_operationses(metadata))
    486 
    487 
    488 class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
    489 
    490     def __init__(self, channel, managed_call, method, request_serializer,
    491                  response_deserializer):
    492         self._channel = channel
    493         self._managed_call = managed_call
    494         self._method = method
    495         self._request_serializer = request_serializer
    496         self._response_deserializer = response_deserializer
    497 
    498     def _prepare(self, request, timeout, metadata):
    499         deadline, serialized_request, rendezvous = _start_unary_request(
    500             request, timeout, self._request_serializer)
    501         if serialized_request is None:
    502             return None, None, None, rendezvous
    503         else:
    504             state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
    505             operations = (
    506                 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
    507                 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
    508                 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
    509                 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
    510                 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
    511                 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
    512             )
    513             return state, operations, deadline, None
    514 
    515     def _blocking(self, request, timeout, metadata, credentials):
    516         state, operations, deadline, rendezvous = self._prepare(
    517             request, timeout, metadata)
    518         if state is None:
    519             raise rendezvous
    520         else:
    521             call = self._channel.segregated_call(
    522                 0, self._method, None, deadline, metadata, None
    523                 if credentials is None else credentials._credentials, ((
    524                     operations,
    525                     None,
    526                 ),))
    527             event = call.next_event()
    528             _handle_event(event, state, self._response_deserializer)
    529             return state, call,
    530 
    531     def __call__(self, request, timeout=None, metadata=None, credentials=None):
    532         state, call, = self._blocking(request, timeout, metadata, credentials)
    533         return _end_unary_response_blocking(state, call, False, None)
    534 
    535     def with_call(self, request, timeout=None, metadata=None, credentials=None):
    536         state, call, = self._blocking(request, timeout, metadata, credentials)
    537         return _end_unary_response_blocking(state, call, True, None)
    538 
    539     def future(self, request, timeout=None, metadata=None, credentials=None):
    540         state, operations, deadline, rendezvous = self._prepare(
    541             request, timeout, metadata)
    542         if state is None:
    543             raise rendezvous
    544         else:
    545             event_handler = _event_handler(state, self._response_deserializer)
    546             call = self._managed_call(
    547                 0, self._method, None, deadline, metadata, None
    548                 if credentials is None else credentials._credentials,
    549                 (operations,), event_handler)
    550             return _Rendezvous(state, call, self._response_deserializer,
    551                                deadline)
    552 
    553 
    554 class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
    555 
    556     def __init__(self, channel, managed_call, method, request_serializer,
    557                  response_deserializer):
    558         self._channel = channel
    559         self._managed_call = managed_call
    560         self._method = method
    561         self._request_serializer = request_serializer
    562         self._response_deserializer = response_deserializer
    563 
    564     def __call__(self, request, timeout=None, metadata=None, credentials=None):
    565         deadline, serialized_request, rendezvous = _start_unary_request(
    566             request, timeout, self._request_serializer)
    567         if serialized_request is None:
    568             raise rendezvous
    569         else:
    570             state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
    571             operationses = (
    572                 (
    573                     cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
    574                     cygrpc.SendMessageOperation(serialized_request,
    575                                                 _EMPTY_FLAGS),
    576                     cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
    577                     cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
    578                 ),
    579                 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
    580             )
    581             event_handler = _event_handler(state, self._response_deserializer)
    582             call = self._managed_call(
    583                 0, self._method, None, deadline, metadata, None
    584                 if credentials is None else credentials._credentials,
    585                 operationses, event_handler)
    586             return _Rendezvous(state, call, self._response_deserializer,
    587                                deadline)
    588 
    589 
    590 class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
    591 
    592     def __init__(self, channel, managed_call, method, request_serializer,
    593                  response_deserializer):
    594         self._channel = channel
    595         self._managed_call = managed_call
    596         self._method = method
    597         self._request_serializer = request_serializer
    598         self._response_deserializer = response_deserializer
    599 
    600     def _blocking(self, request_iterator, timeout, metadata, credentials):
    601         deadline = _deadline(timeout)
    602         state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
    603         call = self._channel.segregated_call(
    604             0, self._method, None, deadline, metadata, None
    605             if credentials is None else credentials._credentials,
    606             _stream_unary_invocation_operationses_and_tags(metadata))
    607         _consume_request_iterator(request_iterator, state, call,
    608                                   self._request_serializer, None)
    609         while True:
    610             event = call.next_event()
    611             with state.condition:
    612                 _handle_event(event, state, self._response_deserializer)
    613                 state.condition.notify_all()
    614                 if not state.due:
    615                     break
    616         return state, call,
    617 
    618     def __call__(self,
    619                  request_iterator,
    620                  timeout=None,
    621                  metadata=None,
    622                  credentials=None):
    623         state, call, = self._blocking(request_iterator, timeout, metadata,
    624                                       credentials)
    625         return _end_unary_response_blocking(state, call, False, None)
    626 
    627     def with_call(self,
    628                   request_iterator,
    629                   timeout=None,
    630                   metadata=None,
    631                   credentials=None):
    632         state, call, = self._blocking(request_iterator, timeout, metadata,
    633                                       credentials)
    634         return _end_unary_response_blocking(state, call, True, None)
    635 
    636     def future(self,
    637                request_iterator,
    638                timeout=None,
    639                metadata=None,
    640                credentials=None):
    641         deadline = _deadline(timeout)
    642         state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
    643         event_handler = _event_handler(state, self._response_deserializer)
    644         call = self._managed_call(
    645             0, self._method, None, deadline, metadata, None
    646             if credentials is None else credentials._credentials,
    647             _stream_unary_invocation_operationses(metadata), event_handler)
    648         _consume_request_iterator(request_iterator, state, call,
    649                                   self._request_serializer, event_handler)
    650         return _Rendezvous(state, call, self._response_deserializer, deadline)
    651 
    652 
    653 class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
    654 
    655     def __init__(self, channel, managed_call, method, request_serializer,
    656                  response_deserializer):
    657         self._channel = channel
    658         self._managed_call = managed_call
    659         self._method = method
    660         self._request_serializer = request_serializer
    661         self._response_deserializer = response_deserializer
    662 
    663     def __call__(self,
    664                  request_iterator,
    665                  timeout=None,
    666                  metadata=None,
    667                  credentials=None):
    668         deadline = _deadline(timeout)
    669         state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
    670         operationses = (
    671             (
    672                 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
    673                 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
    674             ),
    675             (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
    676         )
    677         event_handler = _event_handler(state, self._response_deserializer)
    678         call = self._managed_call(
    679             0, self._method, None, deadline, metadata, None
    680             if credentials is None else credentials._credentials, operationses,
    681             event_handler)
    682         _consume_request_iterator(request_iterator, state, call,
    683                                   self._request_serializer, event_handler)
    684         return _Rendezvous(state, call, self._response_deserializer, deadline)
    685 
    686 
    687 class _ChannelCallState(object):
    688 
    689     def __init__(self, channel):
    690         self.lock = threading.Lock()
    691         self.channel = channel
    692         self.managed_calls = 0
    693         self.threading = False
    694 
    695     def reset_postfork_child(self):
    696         self.managed_calls = 0
    697 
    698 
    699 def _run_channel_spin_thread(state):
    700 
    701     def channel_spin():
    702         while True:
    703             cygrpc.block_if_fork_in_progress(state)
    704             event = state.channel.next_call_event()
    705             if event.completion_type == cygrpc.CompletionType.queue_timeout:
    706                 continue
    707             call_completed = event.tag(event)
    708             if call_completed:
    709                 with state.lock:
    710                     state.managed_calls -= 1
    711                     if state.managed_calls == 0:
    712                         return
    713 
    714     channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
    715     channel_spin_thread.setDaemon(True)
    716     channel_spin_thread.start()
    717 
    718 
    719 def _channel_managed_call_management(state):
    720 
    721     # pylint: disable=too-many-arguments
    722     def create(flags, method, host, deadline, metadata, credentials,
    723                operationses, event_handler):
    724         """Creates a cygrpc.IntegratedCall.
    725 
    726         Args:
    727           flags: An integer bitfield of call flags.
    728           method: The RPC method.
    729           host: A host string for the created call.
    730           deadline: A float to be the deadline of the created call or None if
    731             the call is to have an infinite deadline.
    732           metadata: The metadata for the call or None.
    733           credentials: A cygrpc.CallCredentials or None.
    734           operationses: An iterable of iterables of cygrpc.Operations to be
    735             started on the call.
    736           event_handler: A behavior to call to handle the events resultant from
    737             the operations on the call.
    738 
    739         Returns:
    740           A cygrpc.IntegratedCall with which to conduct an RPC.
    741         """
    742         operationses_and_tags = tuple((
    743             operations,
    744             event_handler,
    745         ) for operations in operationses)
    746         with state.lock:
    747             call = state.channel.integrated_call(flags, method, host, deadline,
    748                                                  metadata, credentials,
    749                                                  operationses_and_tags)
    750             if state.managed_calls == 0:
    751                 state.managed_calls = 1
    752                 _run_channel_spin_thread(state)
    753             else:
    754                 state.managed_calls += 1
    755             return call
    756 
    757     return create
    758 
    759 
    760 class _ChannelConnectivityState(object):
    761 
    762     def __init__(self, channel):
    763         self.lock = threading.RLock()
    764         self.channel = channel
    765         self.polling = False
    766         self.connectivity = None
    767         self.try_to_connect = False
    768         self.callbacks_and_connectivities = []
    769         self.delivering = False
    770 
    771     def reset_postfork_child(self):
    772         self.polling = False
    773         self.connectivity = None
    774         self.try_to_connect = False
    775         self.callbacks_and_connectivities = []
    776         self.delivering = False
    777 
    778 
    779 def _deliveries(state):
    780     callbacks_needing_update = []
    781     for callback_and_connectivity in state.callbacks_and_connectivities:
    782         callback, callback_connectivity, = callback_and_connectivity
    783         if callback_connectivity is not state.connectivity:
    784             callbacks_needing_update.append(callback)
    785             callback_and_connectivity[1] = state.connectivity
    786     return callbacks_needing_update
    787 
    788 
    789 def _deliver(state, initial_connectivity, initial_callbacks):
    790     connectivity = initial_connectivity
    791     callbacks = initial_callbacks
    792     while True:
    793         for callback in callbacks:
    794             cygrpc.block_if_fork_in_progress(state)
    795             callable_util.call_logging_exceptions(
    796                 callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
    797                 connectivity)
    798         with state.lock:
    799             callbacks = _deliveries(state)
    800             if callbacks:
    801                 connectivity = state.connectivity
    802             else:
    803                 state.delivering = False
    804                 return
    805 
    806 
    807 def _spawn_delivery(state, callbacks):
    808     delivering_thread = cygrpc.ForkManagedThread(
    809         target=_deliver, args=(
    810             state,
    811             state.connectivity,
    812             callbacks,
    813         ))
    814     delivering_thread.start()
    815     state.delivering = True
    816 
    817 
    818 # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
    819 def _poll_connectivity(state, channel, initial_try_to_connect):
    820     try_to_connect = initial_try_to_connect
    821     connectivity = channel.check_connectivity_state(try_to_connect)
    822     with state.lock:
    823         state.connectivity = (
    824             _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
    825                 connectivity])
    826         callbacks = tuple(callback
    827                           for callback, unused_but_known_to_be_none_connectivity
    828                           in state.callbacks_and_connectivities)
    829         for callback_and_connectivity in state.callbacks_and_connectivities:
    830             callback_and_connectivity[1] = state.connectivity
    831         if callbacks:
    832             _spawn_delivery(state, callbacks)
    833     while True:
    834         event = channel.watch_connectivity_state(connectivity,
    835                                                  time.time() + 0.2)
    836         cygrpc.block_if_fork_in_progress(state)
    837         with state.lock:
    838             if not state.callbacks_and_connectivities and not state.try_to_connect:
    839                 state.polling = False
    840                 state.connectivity = None
    841                 break
    842             try_to_connect = state.try_to_connect
    843             state.try_to_connect = False
    844         if event.success or try_to_connect:
    845             connectivity = channel.check_connectivity_state(try_to_connect)
    846             with state.lock:
    847                 state.connectivity = (
    848                     _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
    849                         connectivity])
    850                 if not state.delivering:
    851                     callbacks = _deliveries(state)
    852                     if callbacks:
    853                         _spawn_delivery(state, callbacks)
    854 
    855 
    856 def _moot(state):
    857     with state.lock:
    858         del state.callbacks_and_connectivities[:]
    859 
    860 
    861 def _subscribe(state, callback, try_to_connect):
    862     with state.lock:
    863         if not state.callbacks_and_connectivities and not state.polling:
    864             polling_thread = cygrpc.ForkManagedThread(
    865                 target=_poll_connectivity,
    866                 args=(state, state.channel, bool(try_to_connect)))
    867             polling_thread.setDaemon(True)
    868             polling_thread.start()
    869             state.polling = True
    870             state.callbacks_and_connectivities.append([callback, None])
    871         elif not state.delivering and state.connectivity is not None:
    872             _spawn_delivery(state, (callback,))
    873             state.try_to_connect |= bool(try_to_connect)
    874             state.callbacks_and_connectivities.append(
    875                 [callback, state.connectivity])
    876         else:
    877             state.try_to_connect |= bool(try_to_connect)
    878             state.callbacks_and_connectivities.append([callback, None])
    879 
    880 
    881 def _unsubscribe(state, callback):
    882     with state.lock:
    883         for index, (subscribed_callback, unused_connectivity) in enumerate(
    884                 state.callbacks_and_connectivities):
    885             if callback == subscribed_callback:
    886                 state.callbacks_and_connectivities.pop(index)
    887                 break
    888 
    889 
    890 def _options(options):
    891     return list(options) + [
    892         (
    893             cygrpc.ChannelArgKey.primary_user_agent_string,
    894             _USER_AGENT,
    895         ),
    896     ]
    897 
    898 
    899 class Channel(grpc.Channel):
    900     """A cygrpc.Channel-backed implementation of grpc.Channel."""
    901 
    902     def __init__(self, target, options, credentials):
    903         """Constructor.
    904 
    905         Args:
    906           target: The target to which to connect.
    907           options: Configuration options for the channel.
    908           credentials: A cygrpc.ChannelCredentials or None.
    909         """
    910         self._channel = cygrpc.Channel(
    911             _common.encode(target), _options(options), credentials)
    912         self._call_state = _ChannelCallState(self._channel)
    913         self._connectivity_state = _ChannelConnectivityState(self._channel)
    914         cygrpc.fork_register_channel(self)
    915 
    916     def subscribe(self, callback, try_to_connect=None):
    917         _subscribe(self._connectivity_state, callback, try_to_connect)
    918 
    919     def unsubscribe(self, callback):
    920         _unsubscribe(self._connectivity_state, callback)
    921 
    922     def unary_unary(self,
    923                     method,
    924                     request_serializer=None,
    925                     response_deserializer=None):
    926         return _UnaryUnaryMultiCallable(
    927             self._channel, _channel_managed_call_management(self._call_state),
    928             _common.encode(method), request_serializer, response_deserializer)
    929 
    930     def unary_stream(self,
    931                      method,
    932                      request_serializer=None,
    933                      response_deserializer=None):
    934         return _UnaryStreamMultiCallable(
    935             self._channel, _channel_managed_call_management(self._call_state),
    936             _common.encode(method), request_serializer, response_deserializer)
    937 
    938     def stream_unary(self,
    939                      method,
    940                      request_serializer=None,
    941                      response_deserializer=None):
    942         return _StreamUnaryMultiCallable(
    943             self._channel, _channel_managed_call_management(self._call_state),
    944             _common.encode(method), request_serializer, response_deserializer)
    945 
    946     def stream_stream(self,
    947                       method,
    948                       request_serializer=None,
    949                       response_deserializer=None):
    950         return _StreamStreamMultiCallable(
    951             self._channel, _channel_managed_call_management(self._call_state),
    952             _common.encode(method), request_serializer, response_deserializer)
    953 
    954     def _close(self):
    955         self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
    956         _moot(self._connectivity_state)
    957 
    958     def _close_on_fork(self):
    959         self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
    960                                     'Channel closed due to fork')
    961         _moot(self._connectivity_state)
    962 
    963     def __enter__(self):
    964         return self
    965 
    966     def __exit__(self, exc_type, exc_val, exc_tb):
    967         self._close()
    968         return False
    969 
    970     def close(self):
    971         self._close()
    972 
    973     def __del__(self):
    974         # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
    975         # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
    976         # here (or more likely, call self._close() here). We don't do this today
    977         # because many valid use cases today allow the channel to be deleted
    978         # immediately after stubs are created. After a sufficient period of time
    979         # has passed for all users to be trusted to hang out to their channels
    980         # for as long as they are in use and to close them after using them,
    981         # then deletion of this grpc._channel.Channel instance can be made to
    982         # effect closure of the underlying cygrpc.Channel instance.
    983         cygrpc.fork_unregister_channel(self)
    984         _moot(self._connectivity_state)
    985