Home | History | Annotate | Download | only in GRPCClient
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #import "GRPCCall.h"
     20 
     21 #import "GRPCCall+OAuth2.h"
     22 
     23 #import <RxLibrary/GRXConcurrentWriteable.h>
     24 #import <RxLibrary/GRXImmediateSingleWriter.h>
     25 #include <grpc/grpc.h>
     26 #include <grpc/support/time.h>
     27 
     28 #import "private/GRPCConnectivityMonitor.h"
     29 #import "private/GRPCHost.h"
     30 #import "private/GRPCRequestHeaders.h"
     31 #import "private/GRPCWrappedCall.h"
     32 #import "private/NSData+GRPC.h"
     33 #import "private/NSDictionary+GRPC.h"
     34 #import "private/NSError+GRPC.h"
     35 
     36 // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
     37 // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
     38 // and RECV_STATUS_ON_CLIENT.
     39 NSInteger kMaxClientBatch = 6;
     40 
     41 NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
     42 NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
     43 static NSMutableDictionary *callFlags;
     44 
     45 static NSString *const kAuthorizationHeader = @"authorization";
     46 static NSString *const kBearerPrefix = @"Bearer ";
     47 
     48 const char *kCFStreamVarName = "grpc_cfstream";
     49 
     50 @interface GRPCCall ()<GRXWriteable>
     51 // Make them read-write.
     52 @property(atomic, strong) NSDictionary *responseHeaders;
     53 @property(atomic, strong) NSDictionary *responseTrailers;
     54 @property(atomic) BOOL isWaitingForToken;
     55 @end
     56 
     57 // The following methods of a C gRPC call object aren't reentrant, and thus
     58 // calls to them must be serialized:
     59 // - start_batch
     60 // - destroy
     61 //
     62 // start_batch with a SEND_MESSAGE argument can only be called after the
     63 // OP_COMPLETE event for any previous write is received. This is achieved by
     64 // pausing the requests writer immediately every time it writes a value, and
     65 // resuming it again when OP_COMPLETE is received.
     66 //
     67 // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
     68 // the OP_COMPLETE event for any previous read is received.This is easier to
     69 // enforce, as we're writing the received messages into the writeable:
     70 // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
     71 // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
     72 // each RECV_MESSAGE batch.
     73 @implementation GRPCCall {
     74   dispatch_queue_t _callQueue;
     75 
     76   NSString *_host;
     77   NSString *_path;
     78   GRPCWrappedCall *_wrappedCall;
     79   GRPCConnectivityMonitor *_connectivityMonitor;
     80 
     81   // The C gRPC library has less guarantees on the ordering of events than we
     82   // do. Particularly, in the face of errors, there's no ordering guarantee at
     83   // all. This wrapper over our actual writeable ensures thread-safety and
     84   // correct ordering.
     85   GRXConcurrentWriteable *_responseWriteable;
     86 
     87   // The network thread wants the requestWriter to resume (when the server is ready for more input),
     88   // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
     89   // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
     90   // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
     91   // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
     92   // pause the writer immediately on writeValue:, so we need our locking to be recursive.
     93   GRXWriter *_requestWriter;
     94 
     95   // To create a retain cycle when a call is started, up until it finishes. See
     96   // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
     97   // reference to the call object if all they're interested in is the handler being executed when
     98   // the response arrives.
     99   GRPCCall *_retainSelf;
    100 
    101   GRPCRequestHeaders *_requestHeaders;
    102 
    103   // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
    104   // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
    105   // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
    106   // the SendClose op is added.
    107   BOOL _unaryCall;
    108   NSMutableArray *_unaryOpBatch;
    109 
    110   // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
    111   // queue
    112   dispatch_queue_t _responseQueue;
    113 
    114   // Whether the call is finished. If it is, should not call finishWithError again.
    115   BOOL _finished;
    116 }
    117 
    118 @synthesize state = _state;
    119 
    120 + (void)initialize {
    121   // Guarantees the code in {} block is invoked only once. See ref at:
    122   // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
    123   if (self == [GRPCCall self]) {
    124     grpc_init();
    125     callFlags = [NSMutableDictionary dictionary];
    126   }
    127 }
    128 
    129 + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
    130   NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
    131   switch (callSafety) {
    132     case GRPCCallSafetyDefault:
    133       callFlags[hostAndPath] = @0;
    134       break;
    135     case GRPCCallSafetyIdempotentRequest:
    136       callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
    137       break;
    138     case GRPCCallSafetyCacheableRequest:
    139       callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
    140       break;
    141     default:
    142       break;
    143   }
    144 }
    145 
    146 + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
    147   NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
    148   return [callFlags[hostAndPath] intValue];
    149 }
    150 
    151 - (instancetype)init {
    152   return [self initWithHost:nil path:nil requestsWriter:nil];
    153 }
    154 
    155 // Designated initializer
    156 - (instancetype)initWithHost:(NSString *)host
    157                         path:(NSString *)path
    158               requestsWriter:(GRXWriter *)requestWriter {
    159   if (!host || !path) {
    160     [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
    161   }
    162   if (requestWriter.state != GRXWriterStateNotStarted) {
    163     [NSException raise:NSInvalidArgumentException
    164                 format:@"The requests writer can't be already started."];
    165   }
    166   if ((self = [super init])) {
    167     _host = [host copy];
    168     _path = [path copy];
    169 
    170     // Serial queue to invoke the non-reentrant methods of the grpc_call object.
    171     _callQueue = dispatch_queue_create("io.grpc.call", NULL);
    172 
    173     _requestWriter = requestWriter;
    174 
    175     _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
    176 
    177     if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
    178       _unaryCall = YES;
    179       _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
    180     }
    181 
    182     _responseQueue = dispatch_get_main_queue();
    183   }
    184   return self;
    185 }
    186 
    187 - (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
    188   if (_state != GRXWriterStateNotStarted) {
    189     return;
    190   }
    191   _responseQueue = queue;
    192 }
    193 
    194 #pragma mark Finish
    195 
    196 - (void)finishWithError:(NSError *)errorOrNil {
    197   @synchronized(self) {
    198     _state = GRXWriterStateFinished;
    199   }
    200 
    201   // If there were still request messages coming, stop them.
    202   @synchronized(_requestWriter) {
    203     _requestWriter.state = GRXWriterStateFinished;
    204   }
    205 
    206   if (errorOrNil) {
    207     [_responseWriteable cancelWithError:errorOrNil];
    208   } else {
    209     [_responseWriteable enqueueSuccessfulCompletion];
    210   }
    211 
    212   // Connectivity monitor is not required for CFStream
    213   char *enableCFStream = getenv(kCFStreamVarName);
    214   if (enableCFStream == nil || enableCFStream[0] != '1') {
    215     [GRPCConnectivityMonitor unregisterObserver:self];
    216   }
    217 
    218   // If the call isn't retained anywhere else, it can be deallocated now.
    219   _retainSelf = nil;
    220 }
    221 
    222 - (void)cancelCall {
    223   // Can be called from any thread, any number of times.
    224   [_wrappedCall cancel];
    225 }
    226 
    227 - (void)cancel {
    228   if (!self.isWaitingForToken) {
    229     [self cancelCall];
    230   } else {
    231     self.isWaitingForToken = NO;
    232   }
    233   [self
    234       maybeFinishWithError:[NSError
    235                                errorWithDomain:kGRPCErrorDomain
    236                                           code:GRPCErrorCodeCancelled
    237                                       userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
    238 }
    239 
    240 - (void)maybeFinishWithError:(NSError *)errorOrNil {
    241   BOOL toFinish = NO;
    242   @synchronized(self) {
    243     if (_finished == NO) {
    244       _finished = YES;
    245       toFinish = YES;
    246     }
    247   }
    248   if (toFinish == YES) {
    249     [self finishWithError:errorOrNil];
    250   }
    251 }
    252 
    253 - (void)dealloc {
    254   __block GRPCWrappedCall *wrappedCall = _wrappedCall;
    255   dispatch_async(_callQueue, ^{
    256     wrappedCall = nil;
    257   });
    258 }
    259 
    260 #pragma mark Read messages
    261 
    262 // Only called from the call queue.
    263 // The handler will be called from the network queue.
    264 - (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
    265   // TODO(jcanizales): Add error handlers for async failures
    266   [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
    267 }
    268 
    269 // Called initially from the network queue once response headers are received,
    270 // then "recursively" from the responseWriteable queue after each response from the
    271 // server has been written.
    272 // If the call is currently paused, this is a noop. Restarting the call will invoke this
    273 // method.
    274 // TODO(jcanizales): Rename to readResponseIfNotPaused.
    275 - (void)startNextRead {
    276   @synchronized(self) {
    277     if (self.state == GRXWriterStatePaused) {
    278       return;
    279     }
    280   }
    281 
    282   dispatch_async(_callQueue, ^{
    283     __weak GRPCCall *weakSelf = self;
    284     __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
    285     [self startReadWithHandler:^(grpc_byte_buffer *message) {
    286       __strong GRPCCall *strongSelf = weakSelf;
    287       __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
    288       if (message == NULL) {
    289         // No more messages from the server
    290         return;
    291       }
    292       NSData *data = [NSData grpc_dataWithByteBuffer:message];
    293       grpc_byte_buffer_destroy(message);
    294       if (!data) {
    295         // The app doesn't have enough memory to hold the server response. We
    296         // don't want to throw, because the app shouldn't crash for a behavior
    297         // that's on the hands of any server to have. Instead we finish and ask
    298         // the server to cancel.
    299         [strongSelf cancelCall];
    300         [strongSelf
    301             maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
    302                                                      code:GRPCErrorCodeResourceExhausted
    303                                                  userInfo:@{
    304                                                    NSLocalizedDescriptionKey :
    305                                                        @"Client does not have enough memory to "
    306                                                        @"hold the server response."
    307                                                  }]];
    308         return;
    309       }
    310       [strongWriteable enqueueValue:data
    311                   completionHandler:^{
    312                     [strongSelf startNextRead];
    313                   }];
    314     }];
    315   });
    316 }
    317 
    318 #pragma mark Send headers
    319 
    320 - (void)sendHeaders:(NSDictionary *)headers {
    321   // TODO(jcanizales): Add error handlers for async failures
    322   GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
    323       initWithMetadata:headers
    324                  flags:[GRPCCall callFlagsForHost:_host path:_path]
    325                handler:nil];  // No clean-up needed after SEND_INITIAL_METADATA
    326   if (!_unaryCall) {
    327     [_wrappedCall startBatchWithOperations:@[ op ]];
    328   } else {
    329     [_unaryOpBatch addObject:op];
    330   }
    331 }
    332 
    333 #pragma mark GRXWriteable implementation
    334 
    335 // Only called from the call queue. The error handler will be called from the
    336 // network queue if the write didn't succeed.
    337 // If the call is a unary call, parameter \a errorHandler will be ignored and
    338 // the error handler of GRPCOpSendClose will be executed in case of error.
    339 - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
    340   __weak GRPCCall *weakSelf = self;
    341   void (^resumingHandler)(void) = ^{
    342     // Resume the request writer.
    343     GRPCCall *strongSelf = weakSelf;
    344     if (strongSelf) {
    345       @synchronized(strongSelf->_requestWriter) {
    346         strongSelf->_requestWriter.state = GRXWriterStateStarted;
    347       }
    348     }
    349   };
    350 
    351   GRPCOpSendMessage *op =
    352       [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
    353   if (!_unaryCall) {
    354     [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
    355   } else {
    356     // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
    357     // TODO (mxyan): unify the error handlers of all Ops into a single closure.
    358     [_unaryOpBatch addObject:op];
    359   }
    360 }
    361 
    362 - (void)writeValue:(id)value {
    363   // TODO(jcanizales): Throw/assert if value isn't NSData.
    364 
    365   // Pause the input and only resume it when the C layer notifies us that writes
    366   // can proceed.
    367   @synchronized(_requestWriter) {
    368     _requestWriter.state = GRXWriterStatePaused;
    369   }
    370 
    371   dispatch_async(_callQueue, ^{
    372     // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
    373     [self writeMessage:value withErrorHandler:nil];
    374   });
    375 }
    376 
    377 // Only called from the call queue. The error handler will be called from the
    378 // network queue if the requests stream couldn't be closed successfully.
    379 - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
    380   if (!_unaryCall) {
    381     [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
    382                               errorHandler:errorHandler];
    383   } else {
    384     [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
    385     [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
    386   }
    387 }
    388 
    389 - (void)writesFinishedWithError:(NSError *)errorOrNil {
    390   if (errorOrNil) {
    391     [self cancel];
    392   } else {
    393     dispatch_async(_callQueue, ^{
    394       // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
    395       [self finishRequestWithErrorHandler:nil];
    396     });
    397   }
    398 }
    399 
    400 #pragma mark Invoke
    401 
    402 // Both handlers will eventually be called, from the network queue. Writes can start immediately
    403 // after this.
    404 // The first one (headersHandler), when the response headers are received.
    405 // The second one (completionHandler), whenever the RPC finishes for any reason.
    406 - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
    407                    completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
    408   // TODO(jcanizales): Add error handlers for async failures
    409   [_wrappedCall
    410       startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
    411   [_wrappedCall
    412       startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
    413 }
    414 
    415 - (void)invokeCall {
    416   __weak GRPCCall *weakSelf = self;
    417   [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
    418     // Response headers received.
    419     __strong GRPCCall *strongSelf = weakSelf;
    420     if (strongSelf) {
    421       strongSelf.responseHeaders = headers;
    422       [strongSelf startNextRead];
    423     }
    424   }
    425       completionHandler:^(NSError *error, NSDictionary *trailers) {
    426         __strong GRPCCall *strongSelf = weakSelf;
    427         if (strongSelf) {
    428           strongSelf.responseTrailers = trailers;
    429 
    430           if (error) {
    431             NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
    432             if (error.userInfo) {
    433               [userInfo addEntriesFromDictionary:error.userInfo];
    434             }
    435             userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
    436             // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
    437             // called before this one, so an error might end up with trailers but no headers. We
    438             // shouldn't call finishWithError until ater both blocks are called. It is also when
    439             // this is done that we can provide a merged view of response headers and trailers in a
    440             // thread-safe way.
    441             if (strongSelf.responseHeaders) {
    442               userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
    443             }
    444             error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
    445           }
    446           [strongSelf maybeFinishWithError:error];
    447         }
    448       }];
    449   // Now that the RPC has been initiated, request writes can start.
    450   @synchronized(_requestWriter) {
    451     [_requestWriter startWithWriteable:self];
    452   }
    453 }
    454 
    455 #pragma mark GRXWriter implementation
    456 
    457 - (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
    458   _responseWriteable =
    459       [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
    460 
    461   _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host
    462                                             serverName:_serverName
    463                                                   path:_path
    464                                                timeout:_timeout];
    465   NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
    466 
    467   [self sendHeaders:_requestHeaders];
    468   [self invokeCall];
    469 
    470   // Connectivity monitor is not required for CFStream
    471   char *enableCFStream = getenv(kCFStreamVarName);
    472   if (enableCFStream == nil || enableCFStream[0] != '1') {
    473     [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
    474   }
    475 }
    476 
    477 - (void)startWithWriteable:(id<GRXWriteable>)writeable {
    478   @synchronized(self) {
    479     _state = GRXWriterStateStarted;
    480   }
    481 
    482   // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
    483   // This makes RPCs in which the call isn't externally retained possible (as long as it is started
    484   // before being autoreleased).
    485   // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
    486   // that the life of the instance is determined by this retain cycle.
    487   _retainSelf = self;
    488 
    489   if (self.tokenProvider != nil) {
    490     self.isWaitingForToken = YES;
    491     __weak typeof(self) weakSelf = self;
    492     [self.tokenProvider getTokenWithHandler:^(NSString *token) {
    493       typeof(self) strongSelf = weakSelf;
    494       if (strongSelf && strongSelf.isWaitingForToken) {
    495         if (token) {
    496           NSString *t = [kBearerPrefix stringByAppendingString:token];
    497           strongSelf.requestHeaders[kAuthorizationHeader] = t;
    498         }
    499         [strongSelf startCallWithWriteable:writeable];
    500         strongSelf.isWaitingForToken = NO;
    501       }
    502     }];
    503   } else {
    504     [self startCallWithWriteable:writeable];
    505   }
    506 }
    507 
    508 - (void)setState:(GRXWriterState)newState {
    509   @synchronized(self) {
    510     // Manual transitions are only allowed from the started or paused states.
    511     if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
    512       return;
    513     }
    514 
    515     switch (newState) {
    516       case GRXWriterStateFinished:
    517         _state = newState;
    518         // Per GRXWriter's contract, setting the state to Finished manually
    519         // means one doesn't wish the writeable to be messaged anymore.
    520         [_responseWriteable cancelSilently];
    521         _responseWriteable = nil;
    522         return;
    523       case GRXWriterStatePaused:
    524         _state = newState;
    525         return;
    526       case GRXWriterStateStarted:
    527         if (_state == GRXWriterStatePaused) {
    528           _state = newState;
    529           [self startNextRead];
    530         }
    531         return;
    532       case GRXWriterStateNotStarted:
    533         return;
    534     }
    535   }
    536 }
    537 
    538 - (void)connectivityChanged:(NSNotification *)note {
    539   // Cancel underlying call upon this notification
    540   __strong GRPCCall *strongSelf = self;
    541   if (strongSelf) {
    542     [self cancelCall];
    543     [self
    544         maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
    545                                                  code:GRPCErrorCodeUnavailable
    546                                              userInfo:@{
    547                                                NSLocalizedDescriptionKey : @"Connectivity lost."
    548                                              }]];
    549   }
    550 }
    551 
    552 @end
    553