Home | History | Annotate | Download | only in fork
      1 # Copyright 2018 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 """Implementations of fork support test methods."""
     15 
     16 import enum
     17 import json
     18 import logging
     19 import multiprocessing
     20 import os
     21 import threading
     22 import time
     23 
     24 import grpc
     25 
     26 from six.moves import queue
     27 
     28 from src.proto.grpc.testing import empty_pb2
     29 from src.proto.grpc.testing import messages_pb2
     30 from src.proto.grpc.testing import test_pb2_grpc
     31 
     32 _LOGGER = logging.getLogger(__name__)
     33 
     34 
     35 def _channel(args):
     36     target = '{}:{}'.format(args.server_host, args.server_port)
     37     if args.use_tls:
     38         channel_credentials = grpc.ssl_channel_credentials()
     39         channel = grpc.secure_channel(target, channel_credentials)
     40     else:
     41         channel = grpc.insecure_channel(target)
     42     return channel
     43 
     44 
     45 def _validate_payload_type_and_length(response, expected_type, expected_length):
     46     if response.payload.type is not expected_type:
     47         raise ValueError('expected payload type %s, got %s' %
     48                          (expected_type, type(response.payload.type)))
     49     elif len(response.payload.body) != expected_length:
     50         raise ValueError('expected payload body size %d, got %d' %
     51                          (expected_length, len(response.payload.body)))
     52 
     53 
     54 def _async_unary(stub):
     55     size = 314159
     56     request = messages_pb2.SimpleRequest(
     57         response_type=messages_pb2.COMPRESSABLE,
     58         response_size=size,
     59         payload=messages_pb2.Payload(body=b'\x00' * 271828))
     60     response_future = stub.UnaryCall.future(request)
     61     response = response_future.result()
     62     _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
     63 
     64 
     65 def _blocking_unary(stub):
     66     size = 314159
     67     request = messages_pb2.SimpleRequest(
     68         response_type=messages_pb2.COMPRESSABLE,
     69         response_size=size,
     70         payload=messages_pb2.Payload(body=b'\x00' * 271828))
     71     response = stub.UnaryCall(request)
     72     _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
     73 
     74 
     75 class _Pipe(object):
     76 
     77     def __init__(self):
     78         self._condition = threading.Condition()
     79         self._values = []
     80         self._open = True
     81 
     82     def __iter__(self):
     83         return self
     84 
     85     def __next__(self):
     86         return self.next()
     87 
     88     def next(self):
     89         with self._condition:
     90             while not self._values and self._open:
     91                 self._condition.wait()
     92             if self._values:
     93                 return self._values.pop(0)
     94             else:
     95                 raise StopIteration()
     96 
     97     def add(self, value):
     98         with self._condition:
     99             self._values.append(value)
    100             self._condition.notify()
    101 
    102     def close(self):
    103         with self._condition:
    104             self._open = False
    105             self._condition.notify()
    106 
    107     def __enter__(self):
    108         return self
    109 
    110     def __exit__(self, type, value, traceback):
    111         self.close()
    112 
    113 
    114 class _ChildProcess(object):
    115 
    116     def __init__(self, task, args=None):
    117         if args is None:
    118             args = ()
    119         self._exceptions = multiprocessing.Queue()
    120 
    121         def record_exceptions():
    122             try:
    123                 task(*args)
    124             except Exception as e:  # pylint: disable=broad-except
    125                 self._exceptions.put(e)
    126 
    127         self._process = multiprocessing.Process(target=record_exceptions)
    128 
    129     def start(self):
    130         self._process.start()
    131 
    132     def finish(self):
    133         self._process.join()
    134         if self._process.exitcode != 0:
    135             raise ValueError('Child process failed with exitcode %d' %
    136                              self._process.exitcode)
    137         try:
    138             exception = self._exceptions.get(block=False)
    139             raise ValueError('Child process failed: %s' % exception)
    140         except queue.Empty:
    141             pass
    142 
    143 
    144 def _async_unary_same_channel(channel):
    145 
    146     def child_target():
    147         try:
    148             _async_unary(stub)
    149             raise Exception(
    150                 'Child should not be able to re-use channel after fork')
    151         except ValueError as expected_value_error:
    152             pass
    153 
    154     stub = test_pb2_grpc.TestServiceStub(channel)
    155     _async_unary(stub)
    156     child_process = _ChildProcess(child_target)
    157     child_process.start()
    158     _async_unary(stub)
    159     child_process.finish()
    160 
    161 
    162 def _async_unary_new_channel(channel, args):
    163 
    164     def child_target():
    165         child_channel = _channel(args)
    166         child_stub = test_pb2_grpc.TestServiceStub(child_channel)
    167         _async_unary(child_stub)
    168         child_channel.close()
    169 
    170     stub = test_pb2_grpc.TestServiceStub(channel)
    171     _async_unary(stub)
    172     child_process = _ChildProcess(child_target)
    173     child_process.start()
    174     _async_unary(stub)
    175     child_process.finish()
    176 
    177 
    178 def _blocking_unary_same_channel(channel):
    179 
    180     def child_target():
    181         try:
    182             _blocking_unary(stub)
    183             raise Exception(
    184                 'Child should not be able to re-use channel after fork')
    185         except ValueError as expected_value_error:
    186             pass
    187 
    188     stub = test_pb2_grpc.TestServiceStub(channel)
    189     _blocking_unary(stub)
    190     child_process = _ChildProcess(child_target)
    191     child_process.start()
    192     child_process.finish()
    193 
    194 
    195 def _blocking_unary_new_channel(channel, args):
    196 
    197     def child_target():
    198         child_channel = _channel(args)
    199         child_stub = test_pb2_grpc.TestServiceStub(child_channel)
    200         _blocking_unary(child_stub)
    201         child_channel.close()
    202 
    203     stub = test_pb2_grpc.TestServiceStub(channel)
    204     _blocking_unary(stub)
    205     child_process = _ChildProcess(child_target)
    206     child_process.start()
    207     _blocking_unary(stub)
    208     child_process.finish()
    209 
    210 
    211 # Verify that the fork channel registry can handle already closed channels
    212 def _close_channel_before_fork(channel, args):
    213 
    214     def child_target():
    215         new_channel.close()
    216         child_channel = _channel(args)
    217         child_stub = test_pb2_grpc.TestServiceStub(child_channel)
    218         _blocking_unary(child_stub)
    219         child_channel.close()
    220 
    221     stub = test_pb2_grpc.TestServiceStub(channel)
    222     _blocking_unary(stub)
    223     channel.close()
    224 
    225     new_channel = _channel(args)
    226     new_stub = test_pb2_grpc.TestServiceStub(new_channel)
    227     child_process = _ChildProcess(child_target)
    228     child_process.start()
    229     _blocking_unary(new_stub)
    230     child_process.finish()
    231 
    232 
    233 def _connectivity_watch(channel, args):
    234 
    235     def child_target():
    236 
    237         def child_connectivity_callback(state):
    238             child_states.append(state)
    239 
    240         child_states = []
    241         child_channel = _channel(args)
    242         child_stub = test_pb2_grpc.TestServiceStub(child_channel)
    243         child_channel.subscribe(child_connectivity_callback)
    244         _async_unary(child_stub)
    245         if len(child_states
    246               ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY:
    247             raise ValueError('Channel did not move to READY')
    248         if len(parent_states) > 1:
    249             raise ValueError('Received connectivity updates on parent callback')
    250         child_channel.unsubscribe(child_connectivity_callback)
    251         child_channel.close()
    252 
    253     def parent_connectivity_callback(state):
    254         parent_states.append(state)
    255 
    256     parent_states = []
    257     channel.subscribe(parent_connectivity_callback)
    258     stub = test_pb2_grpc.TestServiceStub(channel)
    259     child_process = _ChildProcess(child_target)
    260     child_process.start()
    261     _async_unary(stub)
    262     if len(parent_states
    263           ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY:
    264         raise ValueError('Channel did not move to READY')
    265     channel.unsubscribe(parent_connectivity_callback)
    266     child_process.finish()
    267 
    268     # Need to unsubscribe or _channel.py in _poll_connectivity triggers a
    269     # "Cannot invoke RPC on closed channel!" error.
    270     # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling
    271     channel.unsubscribe(parent_connectivity_callback)
    272 
    273 
    274 def _ping_pong_with_child_processes_after_first_response(
    275         channel, args, child_target, run_after_close=True):
    276     request_response_sizes = (
    277         31415,
    278         9,
    279         2653,
    280         58979,
    281     )
    282     request_payload_sizes = (
    283         27182,
    284         8,
    285         1828,
    286         45904,
    287     )
    288     stub = test_pb2_grpc.TestServiceStub(channel)
    289     pipe = _Pipe()
    290     parent_bidi_call = stub.FullDuplexCall(pipe)
    291     child_processes = []
    292     first_message_received = False
    293     for response_size, payload_size in zip(request_response_sizes,
    294                                            request_payload_sizes):
    295         request = messages_pb2.StreamingOutputCallRequest(
    296             response_type=messages_pb2.COMPRESSABLE,
    297             response_parameters=(
    298                 messages_pb2.ResponseParameters(size=response_size),),
    299             payload=messages_pb2.Payload(body=b'\x00' * payload_size))
    300         pipe.add(request)
    301         if first_message_received:
    302             child_process = _ChildProcess(child_target,
    303                                           (parent_bidi_call, channel, args))
    304             child_process.start()
    305             child_processes.append(child_process)
    306         response = next(parent_bidi_call)
    307         first_message_received = True
    308         child_process = _ChildProcess(child_target,
    309                                       (parent_bidi_call, channel, args))
    310         child_process.start()
    311         child_processes.append(child_process)
    312         _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
    313                                           response_size)
    314     pipe.close()
    315     if run_after_close:
    316         child_process = _ChildProcess(child_target,
    317                                       (parent_bidi_call, channel, args))
    318         child_process.start()
    319         child_processes.append(child_process)
    320     for child_process in child_processes:
    321         child_process.finish()
    322 
    323 
    324 def _in_progress_bidi_continue_call(channel):
    325 
    326     def child_target(parent_bidi_call, parent_channel, args):
    327         stub = test_pb2_grpc.TestServiceStub(parent_channel)
    328         try:
    329             _async_unary(stub)
    330             raise Exception(
    331                 'Child should not be able to re-use channel after fork')
    332         except ValueError as expected_value_error:
    333             pass
    334         inherited_code = parent_bidi_call.code()
    335         inherited_details = parent_bidi_call.details()
    336         if inherited_code != grpc.StatusCode.CANCELLED:
    337             raise ValueError(
    338                 'Expected inherited code CANCELLED, got %s' % inherited_code)
    339         if inherited_details != 'Channel closed due to fork':
    340             raise ValueError(
    341                 'Expected inherited details Channel closed due to fork, got %s'
    342                 % inherited_details)
    343 
    344     # Don't run child_target after closing the parent call, as the call may have
    345     # received a status from the  server before fork occurs.
    346     _ping_pong_with_child_processes_after_first_response(
    347         channel, None, child_target, run_after_close=False)
    348 
    349 
    350 def _in_progress_bidi_same_channel_async_call(channel):
    351 
    352     def child_target(parent_bidi_call, parent_channel, args):
    353         stub = test_pb2_grpc.TestServiceStub(parent_channel)
    354         try:
    355             _async_unary(stub)
    356             raise Exception(
    357                 'Child should not be able to re-use channel after fork')
    358         except ValueError as expected_value_error:
    359             pass
    360 
    361     _ping_pong_with_child_processes_after_first_response(
    362         channel, None, child_target)
    363 
    364 
    365 def _in_progress_bidi_same_channel_blocking_call(channel):
    366 
    367     def child_target(parent_bidi_call, parent_channel, args):
    368         stub = test_pb2_grpc.TestServiceStub(parent_channel)
    369         try:
    370             _blocking_unary(stub)
    371             raise Exception(
    372                 'Child should not be able to re-use channel after fork')
    373         except ValueError as expected_value_error:
    374             pass
    375 
    376     _ping_pong_with_child_processes_after_first_response(
    377         channel, None, child_target)
    378 
    379 
    380 def _in_progress_bidi_new_channel_async_call(channel, args):
    381 
    382     def child_target(parent_bidi_call, parent_channel, args):
    383         channel = _channel(args)
    384         stub = test_pb2_grpc.TestServiceStub(channel)
    385         _async_unary(stub)
    386 
    387     _ping_pong_with_child_processes_after_first_response(
    388         channel, args, child_target)
    389 
    390 
    391 def _in_progress_bidi_new_channel_blocking_call(channel, args):
    392 
    393     def child_target(parent_bidi_call, parent_channel, args):
    394         channel = _channel(args)
    395         stub = test_pb2_grpc.TestServiceStub(channel)
    396         _blocking_unary(stub)
    397 
    398     _ping_pong_with_child_processes_after_first_response(
    399         channel, args, child_target)
    400 
    401 
    402 @enum.unique
    403 class TestCase(enum.Enum):
    404 
    405     CONNECTIVITY_WATCH = 'connectivity_watch'
    406     CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
    407     ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
    408     ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
    409     BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
    410     BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
    411     IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
    412     IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
    413     IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
    414     IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
    415     IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
    416 
    417     def run_test(self, args):
    418         _LOGGER.info("Running %s", self)
    419         channel = _channel(args)
    420         if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
    421             _async_unary_same_channel(channel)
    422         elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
    423             _async_unary_new_channel(channel, args)
    424         elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
    425             _blocking_unary_same_channel(channel)
    426         elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
    427             _blocking_unary_new_channel(channel, args)
    428         elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
    429             _close_channel_before_fork(channel, args)
    430         elif self is TestCase.CONNECTIVITY_WATCH:
    431             _connectivity_watch(channel, args)
    432         elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
    433             _in_progress_bidi_continue_call(channel)
    434         elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
    435             _in_progress_bidi_same_channel_async_call(channel)
    436         elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
    437             _in_progress_bidi_same_channel_blocking_call(channel)
    438         elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
    439             _in_progress_bidi_new_channel_async_call(channel, args)
    440         elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
    441             _in_progress_bidi_new_channel_blocking_call(channel, args)
    442         else:
    443             raise NotImplementedError(
    444                 'Test case "%s" not implemented!' % self.name)
    445         channel.close()
    446