Home | History | Annotate | Download | only in Grpc
      1 <?php
      2 /*
      3  *
      4  * Copyright 2015 gRPC authors.
      5  *
      6  * Licensed under the Apache License, Version 2.0 (the "License");
      7  * you may not use this file except in compliance with the License.
      8  * You may obtain a copy of the License at
      9  *
     10  *     http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  *
     18  */
     19 
     20 namespace Grpc;
     21 
     22 /**
     23  * Base class for generated client stubs. Stub methods are expected to call
     24  * _simpleRequest or _streamRequest and return the result.
     25  */
     26 class BaseStub
     27 {
     28     private $hostname;
     29     private $hostname_override;
     30     private $channel;
     31     private $call_invoker;
     32 
     33     // a callback function
     34     private $update_metadata;
     35 
     36     /**
     37      * @param string  $hostname
     38      * @param array   $opts
     39      *  - 'update_metadata': (optional) a callback function which takes in a
     40      * metadata array, and returns an updated metadata array
     41      *  - 'grpc.primary_user_agent': (optional) a user-agent string
     42      * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
     43      */
     44     public function __construct($hostname, $opts, $channel = null)
     45     {
     46         $ssl_roots = file_get_contents(
     47             dirname(__FILE__).'/../../../../etc/roots.pem'
     48         );
     49         ChannelCredentials::setDefaultRootsPem($ssl_roots);
     50 
     51         $this->hostname = $hostname;
     52         $this->update_metadata = null;
     53         if (isset($opts['update_metadata'])) {
     54             if (is_callable($opts['update_metadata'])) {
     55                 $this->update_metadata = $opts['update_metadata'];
     56             }
     57             unset($opts['update_metadata']);
     58         }
     59         if (!empty($opts['grpc.ssl_target_name_override'])) {
     60             $this->hostname_override = $opts['grpc.ssl_target_name_override'];
     61         }
     62         if (isset($opts['grpc_call_invoker'])) {
     63             $this->call_invoker = $opts['grpc_call_invoker'];
     64             unset($opts['grpc_call_invoker']);
     65             $channel_opts = $this->updateOpts($opts);
     66             // If the grpc_call_invoker is defined, use the channel created by the call invoker.
     67             $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts);
     68             return;
     69         }
     70         $this->call_invoker = new DefaultCallInvoker();
     71         if ($channel) {
     72             if (!is_a($channel, 'Grpc\Channel') &&
     73                 !is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
     74                 throw new \Exception('The channel argument is not a Channel object '.
     75                     'or an InterceptorChannel object created by '.
     76                     'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
     77             }
     78             $this->channel = $channel;
     79             return;
     80         }
     81 
     82         $this->channel = static::getDefaultChannel($hostname, $opts);
     83     }
     84 
     85     private static function updateOpts($opts) {
     86         if (!file_exists($composerFile = __DIR__.'/../../composer.json')) {
     87             // for grpc/grpc-php subpackage
     88             $composerFile = __DIR__.'/../composer.json';
     89         }
     90         $package_config = json_decode(file_get_contents($composerFile), true);
     91         if (!empty($opts['grpc.primary_user_agent'])) {
     92             $opts['grpc.primary_user_agent'] .= ' ';
     93         } else {
     94             $opts['grpc.primary_user_agent'] = '';
     95         }
     96         $opts['grpc.primary_user_agent'] .=
     97             'grpc-php/'.$package_config['version'];
     98         if (!array_key_exists('credentials', $opts)) {
     99             throw new \Exception("The opts['credentials'] key is now ".
    100                 'required. Please see one of the '.
    101                 'ChannelCredentials::create methods');
    102         }
    103         return $opts;
    104     }
    105 
    106     /**
    107      * Creates and returns the default Channel
    108      *
    109      * @param array $opts Channel constructor options
    110      *
    111      * @return Channel The channel
    112      */
    113     public static function getDefaultChannel($hostname, array $opts)
    114     {
    115         $channel_opts = self::updateOpts($opts);
    116         return new Channel($hostname, $opts);
    117     }
    118 
    119     /**
    120      * @return string The URI of the endpoint
    121      */
    122     public function getTarget()
    123     {
    124         return $this->channel->getTarget();
    125     }
    126 
    127     /**
    128      * @param bool $try_to_connect (optional)
    129      *
    130      * @return int The grpc connectivity state
    131      */
    132     public function getConnectivityState($try_to_connect = false)
    133     {
    134         return $this->channel->getConnectivityState($try_to_connect);
    135     }
    136 
    137     /**
    138      * @param int $timeout in microseconds
    139      *
    140      * @return bool true if channel is ready
    141      * @throw Exception if channel is in FATAL_ERROR state
    142      */
    143     public function waitForReady($timeout)
    144     {
    145         $new_state = $this->getConnectivityState(true);
    146         if ($this->_checkConnectivityState($new_state)) {
    147             return true;
    148         }
    149 
    150         $now = Timeval::now();
    151         $delta = new Timeval($timeout);
    152         $deadline = $now->add($delta);
    153 
    154         while ($this->channel->watchConnectivityState($new_state, $deadline)) {
    155             // state has changed before deadline
    156             $new_state = $this->getConnectivityState();
    157             if ($this->_checkConnectivityState($new_state)) {
    158                 return true;
    159             }
    160         }
    161         // deadline has passed
    162         $new_state = $this->getConnectivityState();
    163 
    164         return $this->_checkConnectivityState($new_state);
    165     }
    166 
    167     /**
    168      * Close the communication channel associated with this stub.
    169      */
    170     public function close()
    171     {
    172         $this->channel->close();
    173     }
    174 
    175     /**
    176      * @param $new_state Connect state
    177      *
    178      * @return bool true if state is CHANNEL_READY
    179      * @throw Exception if state is CHANNEL_FATAL_FAILURE
    180      */
    181     private function _checkConnectivityState($new_state)
    182     {
    183         if ($new_state == \Grpc\CHANNEL_READY) {
    184             return true;
    185         }
    186         if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
    187             throw new \Exception('Failed to connect to server');
    188         }
    189 
    190         return false;
    191     }
    192 
    193     /**
    194      * constructs the auth uri for the jwt.
    195      *
    196      * @param string $method The method string
    197      *
    198      * @return string The URL string
    199      */
    200     private function _get_jwt_aud_uri($method)
    201     {
    202         $last_slash_idx = strrpos($method, '/');
    203         if ($last_slash_idx === false) {
    204             throw new \InvalidArgumentException(
    205                 'service name must have a slash'
    206             );
    207         }
    208         $service_name = substr($method, 0, $last_slash_idx);
    209 
    210         if ($this->hostname_override) {
    211             $hostname = $this->hostname_override;
    212         } else {
    213             $hostname = $this->hostname;
    214         }
    215 
    216         return 'https://'.$hostname.$service_name;
    217     }
    218 
    219     /**
    220      * validate and normalize the metadata array.
    221      *
    222      * @param array $metadata The metadata map
    223      *
    224      * @return array $metadata Validated and key-normalized metadata map
    225      * @throw InvalidArgumentException if key contains invalid characters
    226      */
    227     private function _validate_and_normalize_metadata($metadata)
    228     {
    229         $metadata_copy = [];
    230         foreach ($metadata as $key => $value) {
    231             if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
    232                 throw new \InvalidArgumentException(
    233                     'Metadata keys must be nonempty strings containing only '.
    234                     'alphanumeric characters, hyphens and underscores'
    235                 );
    236             }
    237             $metadata_copy[strtolower($key)] = $value;
    238         }
    239 
    240         return $metadata_copy;
    241     }
    242 
    243     /**
    244      * Create a function which can be used to create UnaryCall
    245      *
    246      * @param Channel|InterceptorChannel   $channel
    247      * @param callable $deserialize A function that deserializes the response
    248      *
    249      * @return \Closure
    250      */
    251     private function _GrpcUnaryUnary($channel)
    252     {
    253         return function ($method,
    254                          $argument,
    255                          $deserialize,
    256                          array $metadata = [],
    257                          array $options = []) use ($channel) {
    258             $call = $this->call_invoker->UnaryCall(
    259                 $channel,
    260                 $method,
    261                 $deserialize,
    262                 $options
    263             );
    264             $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
    265             if (is_callable($this->update_metadata)) {
    266                 $metadata = call_user_func(
    267                     $this->update_metadata,
    268                     $metadata,
    269                     $jwt_aud_uri
    270                 );
    271             }
    272             $metadata = $this->_validate_and_normalize_metadata(
    273                 $metadata
    274             );
    275             $call->start($argument, $metadata, $options);
    276             return $call;
    277         };
    278     }
    279 
    280     /**
    281      * Create a function which can be used to create ServerStreamingCall
    282      *
    283      * @param Channel|InterceptorChannel   $channel
    284      * @param callable $deserialize A function that deserializes the response
    285      *
    286      * @return \Closure
    287      */
    288     private function _GrpcStreamUnary($channel)
    289     {
    290         return function ($method,
    291                          $deserialize,
    292                          array $metadata = [],
    293                          array $options = []) use ($channel) {
    294             $call = $this->call_invoker->ClientStreamingCall(
    295                 $channel,
    296                 $method,
    297                 $deserialize,
    298                 $options
    299             );
    300             $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
    301             if (is_callable($this->update_metadata)) {
    302                 $metadata = call_user_func(
    303                     $this->update_metadata,
    304                     $metadata,
    305                     $jwt_aud_uri
    306                 );
    307             }
    308             $metadata = $this->_validate_and_normalize_metadata(
    309                 $metadata
    310             );
    311             $call->start($metadata);
    312             return $call;
    313         };
    314     }
    315 
    316     /**
    317      * Create a function which can be used to create ClientStreamingCall
    318      *
    319      * @param Channel|InterceptorChannel   $channel
    320      * @param callable $deserialize A function that deserializes the response
    321      *
    322      * @return \Closure
    323      */
    324     private function _GrpcUnaryStream($channel)
    325     {
    326         return function ($method,
    327                          $argument,
    328                          $deserialize,
    329                          array $metadata = [],
    330                          array $options = []) use ($channel) {
    331             $call = $this->call_invoker->ServerStreamingCall(
    332                 $channel,
    333                 $method,
    334                 $deserialize,
    335                 $options
    336             );
    337             $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
    338             if (is_callable($this->update_metadata)) {
    339                 $metadata = call_user_func(
    340                     $this->update_metadata,
    341                     $metadata,
    342                     $jwt_aud_uri
    343                 );
    344             }
    345             $metadata = $this->_validate_and_normalize_metadata(
    346                 $metadata
    347             );
    348             $call->start($argument, $metadata, $options);
    349             return $call;
    350         };
    351     }
    352 
    353     /**
    354      * Create a function which can be used to create BidiStreamingCall
    355      *
    356      * @param Channel|InterceptorChannel   $channel
    357      * @param callable $deserialize A function that deserializes the response
    358      *
    359      * @return \Closure
    360      */
    361     private function _GrpcStreamStream($channel)
    362     {
    363         return function ($method,
    364                          $deserialize,
    365                          array $metadata = [],
    366                          array $options = []) use ($channel) {
    367             $call = $this->call_invoker->BidiStreamingCall(
    368                 $channel,
    369                 $method,
    370                 $deserialize,
    371                 $options
    372             );
    373             $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
    374             if (is_callable($this->update_metadata)) {
    375                 $metadata = call_user_func(
    376                     $this->update_metadata,
    377                     $metadata,
    378                     $jwt_aud_uri
    379                 );
    380             }
    381             $metadata = $this->_validate_and_normalize_metadata(
    382                 $metadata
    383             );
    384             $call->start($metadata);
    385 
    386             return $call;
    387         };
    388     }
    389 
    390     /**
    391      * Create a function which can be used to create UnaryCall
    392      *
    393      * @param Channel|InterceptorChannel   $channel
    394      * @param callable $deserialize A function that deserializes the response
    395      *
    396      * @return \Closure
    397      */
    398     private function _UnaryUnaryCallFactory($channel)
    399     {
    400         if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
    401             return function ($method,
    402                              $argument,
    403                              $deserialize,
    404                              array $metadata = [],
    405                              array $options = []) use ($channel) {
    406                 return $channel->getInterceptor()->interceptUnaryUnary(
    407                     $method,
    408                     $argument,
    409                     $deserialize,
    410                     $metadata,
    411                     $options,
    412                     $this->_UnaryUnaryCallFactory($channel->getNext())
    413                 );
    414             };
    415         }
    416         return $this->_GrpcUnaryUnary($channel);
    417     }
    418 
    419     /**
    420      * Create a function which can be used to create ServerStreamingCall
    421      *
    422      * @param Channel|InterceptorChannel   $channel
    423      * @param callable $deserialize A function that deserializes the response
    424      *
    425      * @return \Closure
    426      */
    427     private function _UnaryStreamCallFactory($channel)
    428     {
    429         if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
    430             return function ($method,
    431                              $argument,
    432                              $deserialize,
    433                              array $metadata = [],
    434                              array $options = []) use ($channel) {
    435                 return $channel->getInterceptor()->interceptUnaryStream(
    436                     $method,
    437                     $argument,
    438                     $deserialize,
    439                     $metadata,
    440                     $options,
    441                     $this->_UnaryStreamCallFactory($channel->getNext())
    442                 );
    443             };
    444         }
    445         return $this->_GrpcUnaryStream($channel);
    446     }
    447 
    448     /**
    449      * Create a function which can be used to create ClientStreamingCall
    450      *
    451      * @param Channel|InterceptorChannel   $channel
    452      * @param callable $deserialize A function that deserializes the response
    453      *
    454      * @return \Closure
    455      */
    456     private function _StreamUnaryCallFactory($channel)
    457     {
    458         if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
    459             return function ($method,
    460                              $deserialize,
    461                              array $metadata = [],
    462                              array $options = []) use ($channel) {
    463                 return $channel->getInterceptor()->interceptStreamUnary(
    464                     $method,
    465                     $deserialize,
    466                     $metadata,
    467                     $options,
    468                     $this->_StreamUnaryCallFactory($channel->getNext())
    469                 );
    470             };
    471         }
    472         return $this->_GrpcStreamUnary($channel);
    473     }
    474 
    475     /**
    476      * Create a function which can be used to create BidiStreamingCall
    477      *
    478      * @param Channel|InterceptorChannel   $channel
    479      * @param callable $deserialize A function that deserializes the response
    480      *
    481      * @return \Closure
    482      */
    483     private function _StreamStreamCallFactory($channel)
    484     {
    485         if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
    486             return function ($method,
    487                              $deserialize,
    488                              array $metadata = [],
    489                              array $options = []) use ($channel) {
    490                 return $channel->getInterceptor()->interceptStreamStream(
    491                     $method,
    492                     $deserialize,
    493                     $metadata,
    494                     $options,
    495                     $this->_StreamStreamCallFactory($channel->getNext())
    496                 );
    497             };
    498         }
    499         return $this->_GrpcStreamStream($channel);
    500     }
    501 
    502     /* This class is intended to be subclassed by generated code, so
    503      * all functions begin with "_" to avoid name collisions. */
    504     /**
    505      * Call a remote method that takes a single argument and has a
    506      * single output.
    507      *
    508      * @param string   $method      The name of the method to call
    509      * @param mixed    $argument    The argument to the method
    510      * @param callable $deserialize A function that deserializes the response
    511      * @param array    $metadata    A metadata map to send to the server
    512      *                              (optional)
    513      * @param array    $options     An array of options (optional)
    514      *
    515      * @return UnaryCall The active call object
    516      */
    517     protected function _simpleRequest(
    518         $method,
    519         $argument,
    520         $deserialize,
    521         array $metadata = [],
    522         array $options = []
    523     ) {
    524         $call_factory = $this->_UnaryUnaryCallFactory($this->channel);
    525         $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
    526         return $call;
    527     }
    528 
    529     /**
    530      * Call a remote method that takes a stream of arguments and has a single
    531      * output.
    532      *
    533      * @param string   $method      The name of the method to call
    534      * @param callable $deserialize A function that deserializes the response
    535      * @param array    $metadata    A metadata map to send to the server
    536      *                              (optional)
    537      * @param array    $options     An array of options (optional)
    538      *
    539      * @return ClientStreamingCall The active call object
    540      */
    541     protected function _clientStreamRequest(
    542         $method,
    543         $deserialize,
    544         array $metadata = [],
    545         array $options = []
    546     ) {
    547         $call_factory = $this->_StreamUnaryCallFactory($this->channel);
    548         $call = $call_factory($method, $deserialize, $metadata, $options);
    549         return $call;
    550     }
    551 
    552     /**
    553      * Call a remote method that takes a single argument and returns a stream
    554      * of responses.
    555      *
    556      * @param string   $method      The name of the method to call
    557      * @param mixed    $argument    The argument to the method
    558      * @param callable $deserialize A function that deserializes the responses
    559      * @param array    $metadata    A metadata map to send to the server
    560      *                              (optional)
    561      * @param array    $options     An array of options (optional)
    562      *
    563      * @return ServerStreamingCall The active call object
    564      */
    565     protected function _serverStreamRequest(
    566         $method,
    567         $argument,
    568         $deserialize,
    569         array $metadata = [],
    570         array $options = []
    571     ) {
    572         $call_factory = $this->_UnaryStreamCallFactory($this->channel);
    573         $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
    574         return $call;
    575     }
    576 
    577     /**
    578      * Call a remote method with messages streaming in both directions.
    579      *
    580      * @param string   $method      The name of the method to call
    581      * @param callable $deserialize A function that deserializes the responses
    582      * @param array    $metadata    A metadata map to send to the server
    583      *                              (optional)
    584      * @param array    $options     An array of options (optional)
    585      *
    586      * @return BidiStreamingCall The active call object
    587      */
    588     protected function _bidiRequest(
    589         $method,
    590         $deserialize,
    591         array $metadata = [],
    592         array $options = []
    593     ) {
    594         $call_factory = $this->_StreamStreamCallFactory($this->channel);
    595         $call = $call_factory($method, $deserialize, $metadata, $options);
    596         return $call;
    597     }
    598 }
    599