Home | History | Annotate | Download | only in _channel
      1 # Copyright 2017 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 import threading
     16 
     17 import grpc
     18 from grpc_testing import _common
     19 
     20 
     21 class State(_common.ChannelRpcHandler):
     22 
     23     def __init__(self, invocation_metadata, requests, requests_closed):
     24         self._condition = threading.Condition()
     25         self._invocation_metadata = invocation_metadata
     26         self._requests = requests
     27         self._requests_closed = requests_closed
     28         self._initial_metadata = None
     29         self._responses = []
     30         self._trailing_metadata = None
     31         self._code = None
     32         self._details = None
     33 
     34     def initial_metadata(self):
     35         with self._condition:
     36             while True:
     37                 if self._initial_metadata is None:
     38                     if self._code is None:
     39                         self._condition.wait()
     40                     else:
     41                         return _common.FUSSED_EMPTY_METADATA
     42                 else:
     43                     return self._initial_metadata
     44 
     45     def add_request(self, request):
     46         with self._condition:
     47             if self._code is None and not self._requests_closed:
     48                 self._requests.append(request)
     49                 self._condition.notify_all()
     50                 return True
     51             else:
     52                 return False
     53 
     54     def close_requests(self):
     55         with self._condition:
     56             if self._code is None and not self._requests_closed:
     57                 self._requests_closed = True
     58                 self._condition.notify_all()
     59 
     60     def take_response(self):
     61         with self._condition:
     62             while True:
     63                 if self._code is grpc.StatusCode.OK:
     64                     if self._responses:
     65                         response = self._responses.pop(0)
     66                         return _common.ChannelRpcRead(response, None, None,
     67                                                       None)
     68                     else:
     69                         return _common.ChannelRpcRead(
     70                             None, self._trailing_metadata, grpc.StatusCode.OK,
     71                             self._details)
     72                 elif self._code is None:
     73                     if self._responses:
     74                         response = self._responses.pop(0)
     75                         return _common.ChannelRpcRead(response, None, None,
     76                                                       None)
     77                     else:
     78                         self._condition.wait()
     79                 else:
     80                     return _common.ChannelRpcRead(None, self._trailing_metadata,
     81                                                   self._code, self._details)
     82 
     83     def termination(self):
     84         with self._condition:
     85             while True:
     86                 if self._code is None:
     87                     self._condition.wait()
     88                 else:
     89                     return self._trailing_metadata, self._code, self._details
     90 
     91     def cancel(self, code, details):
     92         with self._condition:
     93             if self._code is None:
     94                 if self._initial_metadata is None:
     95                     self._initial_metadata = _common.FUSSED_EMPTY_METADATA
     96                 self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
     97                 self._code = code
     98                 self._details = details
     99                 self._condition.notify_all()
    100                 return True
    101             else:
    102                 return False
    103 
    104     def take_invocation_metadata(self):
    105         with self._condition:
    106             if self._invocation_metadata is None:
    107                 raise ValueError('Expected invocation metadata!')
    108             else:
    109                 invocation_metadata = self._invocation_metadata
    110                 self._invocation_metadata = None
    111                 return invocation_metadata
    112 
    113     def take_invocation_metadata_and_request(self):
    114         with self._condition:
    115             if self._invocation_metadata is None:
    116                 raise ValueError('Expected invocation metadata!')
    117             elif not self._requests:
    118                 raise ValueError('Expected at least one request!')
    119             else:
    120                 invocation_metadata = self._invocation_metadata
    121                 self._invocation_metadata = None
    122                 return invocation_metadata, self._requests.pop(0)
    123 
    124     def send_initial_metadata(self, initial_metadata):
    125         with self._condition:
    126             self._initial_metadata = _common.fuss_with_metadata(
    127                 initial_metadata)
    128             self._condition.notify_all()
    129 
    130     def take_request(self):
    131         with self._condition:
    132             while True:
    133                 if self._requests:
    134                     return self._requests.pop(0)
    135                 else:
    136                     self._condition.wait()
    137 
    138     def requests_closed(self):
    139         with self._condition:
    140             while True:
    141                 if self._requests_closed:
    142                     return
    143                 else:
    144                     self._condition.wait()
    145 
    146     def send_response(self, response):
    147         with self._condition:
    148             if self._code is None:
    149                 self._responses.append(response)
    150                 self._condition.notify_all()
    151 
    152     def terminate_with_response(self, response, trailing_metadata, code,
    153                                 details):
    154         with self._condition:
    155             if self._initial_metadata is None:
    156                 self._initial_metadata = _common.FUSSED_EMPTY_METADATA
    157             self._responses.append(response)
    158             self._trailing_metadata = _common.fuss_with_metadata(
    159                 trailing_metadata)
    160             self._code = code
    161             self._details = details
    162             self._condition.notify_all()
    163 
    164     def terminate(self, trailing_metadata, code, details):
    165         with self._condition:
    166             if self._initial_metadata is None:
    167                 self._initial_metadata = _common.FUSSED_EMPTY_METADATA
    168             self._trailing_metadata = _common.fuss_with_metadata(
    169                 trailing_metadata)
    170             self._code = code
    171             self._details = details
    172             self._condition.notify_all()
    173 
    174     def cancelled(self):
    175         with self._condition:
    176             while True:
    177                 if self._code is grpc.StatusCode.CANCELLED:
    178                     return
    179                 elif self._code is None:
    180                     self._condition.wait()
    181                 else:
    182                     raise ValueError('Status code unexpectedly {}!'.format(
    183                         self._code))
    184 
    185     def is_active(self):
    186         raise NotImplementedError()
    187 
    188     def time_remaining(self):
    189         raise NotImplementedError()
    190 
    191     def add_callback(self, callback):
    192         raise NotImplementedError()
    193