Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2015 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Charsets.UTF_8;
     20 import static org.junit.Assert.assertEquals;
     21 import static org.junit.Assert.assertNull;
     22 import static org.junit.Assert.assertTrue;
     23 import static org.junit.Assert.fail;
     24 import static org.mockito.Matchers.any;
     25 import static org.mockito.Matchers.isA;
     26 import static org.mockito.Matchers.same;
     27 import static org.mockito.Mockito.doThrow;
     28 import static org.mockito.Mockito.never;
     29 import static org.mockito.Mockito.times;
     30 import static org.mockito.Mockito.verify;
     31 import static org.mockito.Mockito.when;
     32 
     33 import com.google.common.io.CharStreams;
     34 import io.grpc.CompressorRegistry;
     35 import io.grpc.Context;
     36 import io.grpc.DecompressorRegistry;
     37 import io.grpc.InternalChannelz.ServerStats;
     38 import io.grpc.InternalChannelz.ServerStats.Builder;
     39 import io.grpc.Metadata;
     40 import io.grpc.MethodDescriptor;
     41 import io.grpc.MethodDescriptor.Marshaller;
     42 import io.grpc.MethodDescriptor.MethodType;
     43 import io.grpc.ServerCall;
     44 import io.grpc.Status;
     45 import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
     46 import io.grpc.internal.testing.SingleMessageProducer;
     47 import java.io.ByteArrayInputStream;
     48 import java.io.InputStream;
     49 import java.io.InputStreamReader;
     50 import org.junit.Before;
     51 import org.junit.Rule;
     52 import org.junit.Test;
     53 import org.junit.rules.ExpectedException;
     54 import org.junit.runner.RunWith;
     55 import org.junit.runners.JUnit4;
     56 import org.mockito.ArgumentCaptor;
     57 import org.mockito.Mock;
     58 import org.mockito.MockitoAnnotations;
     59 
     60 @RunWith(JUnit4.class)
     61 public class ServerCallImplTest {
     62   @Rule public final ExpectedException thrown = ExpectedException.none();
     63   @Mock private ServerStream stream;
     64   @Mock private ServerCall.Listener<Long> callListener;
     65 
     66   private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
     67   private ServerCallImpl<Long, Long> call;
     68   private Context.CancellableContext context;
     69 
     70   private static final MethodDescriptor<Long, Long> UNARY_METHOD =
     71       MethodDescriptor.<Long, Long>newBuilder()
     72           .setType(MethodType.UNARY)
     73           .setFullMethodName("service/method")
     74           .setRequestMarshaller(new LongMarshaller())
     75           .setResponseMarshaller(new LongMarshaller())
     76           .build();
     77 
     78   private static final MethodDescriptor<Long, Long> CLIENT_STREAMING_METHOD =
     79       MethodDescriptor.<Long, Long>newBuilder()
     80           .setType(MethodType.UNARY)
     81           .setFullMethodName("service/method")
     82           .setRequestMarshaller(new LongMarshaller())
     83           .setResponseMarshaller(new LongMarshaller())
     84           .build();
     85 
     86   private final Metadata requestHeaders = new Metadata();
     87 
     88   @Before
     89   public void setUp() {
     90     MockitoAnnotations.initMocks(this);
     91     context = Context.ROOT.withCancellation();
     92     call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
     93         DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
     94         serverCallTracer);
     95   }
     96 
     97   @Test
     98   public void callTracer_success() {
     99     callTracer0(Status.OK);
    100   }
    101 
    102   @Test
    103   public void callTracer_failure() {
    104     callTracer0(Status.UNKNOWN);
    105   }
    106 
    107   private void callTracer0(Status status) {
    108     CallTracer tracer = CallTracer.getDefaultFactory().create();
    109     Builder beforeBuilder = new Builder();
    110     tracer.updateBuilder(beforeBuilder);
    111     ServerStats before = beforeBuilder.build();
    112     assertEquals(0, before.callsStarted);
    113     assertEquals(0, before.lastCallStartedNanos);
    114 
    115     call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
    116         DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
    117         tracer);
    118 
    119     // required boilerplate
    120     call.sendHeaders(new Metadata());
    121     call.sendMessage(123L);
    122     // end: required boilerplate
    123 
    124     call.close(status, new Metadata());
    125     Builder afterBuilder = new Builder();
    126     tracer.updateBuilder(afterBuilder);
    127     ServerStats after = afterBuilder.build();
    128     assertEquals(1, after.callsStarted);
    129     if (status.isOk()) {
    130       assertEquals(1, after.callsSucceeded);
    131     } else {
    132       assertEquals(1, after.callsFailed);
    133     }
    134   }
    135 
    136   @Test
    137   public void request() {
    138     call.request(10);
    139 
    140     verify(stream).request(10);
    141   }
    142 
    143   @Test
    144   public void sendHeader_firstCall() {
    145     Metadata headers = new Metadata();
    146 
    147     call.sendHeaders(headers);
    148 
    149     verify(stream).writeHeaders(headers);
    150   }
    151 
    152   @Test
    153   public void sendHeader_failsOnSecondCall() {
    154     call.sendHeaders(new Metadata());
    155     thrown.expect(IllegalStateException.class);
    156     thrown.expectMessage("sendHeaders has already been called");
    157 
    158     call.sendHeaders(new Metadata());
    159   }
    160 
    161   @Test
    162   public void sendHeader_failsOnClosed() {
    163     call.close(Status.CANCELLED, new Metadata());
    164 
    165     thrown.expect(IllegalStateException.class);
    166     thrown.expectMessage("call is closed");
    167 
    168     call.sendHeaders(new Metadata());
    169   }
    170 
    171   @Test
    172   public void sendMessage() {
    173     call.sendHeaders(new Metadata());
    174     call.sendMessage(1234L);
    175 
    176     verify(stream).writeMessage(isA(InputStream.class));
    177     verify(stream).flush();
    178   }
    179 
    180   @Test
    181   public void sendMessage_failsOnClosed() {
    182     call.sendHeaders(new Metadata());
    183     call.close(Status.CANCELLED, new Metadata());
    184 
    185     thrown.expect(IllegalStateException.class);
    186     thrown.expectMessage("call is closed");
    187 
    188     call.sendMessage(1234L);
    189   }
    190 
    191   @Test
    192   public void sendMessage_failsIfheadersUnsent() {
    193     thrown.expect(IllegalStateException.class);
    194     thrown.expectMessage("sendHeaders has not been called");
    195 
    196     call.sendMessage(1234L);
    197   }
    198 
    199   @Test
    200   public void sendMessage_closesOnFailure() {
    201     call.sendHeaders(new Metadata());
    202     doThrow(new RuntimeException("bad")).when(stream).writeMessage(isA(InputStream.class));
    203 
    204     call.sendMessage(1234L);
    205 
    206     verify(stream).close(isA(Status.class), isA(Metadata.class));
    207   }
    208 
    209   @Test
    210   public void sendMessage_serverSendsOne_closeOnSecondCall_unary() {
    211     sendMessage_serverSendsOne_closeOnSecondCall(UNARY_METHOD);
    212   }
    213 
    214   @Test
    215   public void sendMessage_serverSendsOne_closeOnSecondCall_clientStreaming() {
    216     sendMessage_serverSendsOne_closeOnSecondCall(CLIENT_STREAMING_METHOD);
    217   }
    218 
    219   private void sendMessage_serverSendsOne_closeOnSecondCall(
    220       MethodDescriptor<Long, Long> method) {
    221     ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
    222         stream,
    223         method,
    224         requestHeaders,
    225         context,
    226         DecompressorRegistry.getDefaultInstance(),
    227         CompressorRegistry.getDefaultInstance(),
    228         serverCallTracer);
    229     serverCall.sendHeaders(new Metadata());
    230     serverCall.sendMessage(1L);
    231     verify(stream, times(1)).writeMessage(any(InputStream.class));
    232     verify(stream, never()).close(any(Status.class), any(Metadata.class));
    233 
    234     // trying to send a second message causes gRPC to close the underlying stream
    235     serverCall.sendMessage(1L);
    236     verify(stream, times(1)).writeMessage(any(InputStream.class));
    237     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
    238     verify(stream, times(1)).cancel(statusCaptor.capture());
    239     assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
    240     assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
    241   }
    242 
    243   @Test
    244   public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_unary() {
    245     sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(UNARY_METHOD);
    246   }
    247 
    248   @Test
    249   public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_clientStreaming() {
    250     sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(CLIENT_STREAMING_METHOD);
    251   }
    252 
    253   private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(
    254       MethodDescriptor<Long, Long> method) {
    255     ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
    256         stream,
    257         method,
    258         requestHeaders,
    259         context,
    260         DecompressorRegistry.getDefaultInstance(),
    261         CompressorRegistry.getDefaultInstance(),
    262         serverCallTracer);
    263     serverCall.sendHeaders(new Metadata());
    264     serverCall.sendMessage(1L);
    265     serverCall.sendMessage(1L);
    266     verify(stream, times(1)).writeMessage(any(InputStream.class));
    267     verify(stream, times(1)).cancel(any(Status.class));
    268 
    269     // App runs to completion but everything is ignored
    270     serverCall.sendMessage(1L);
    271     serverCall.close(Status.OK, new Metadata());
    272     try {
    273       serverCall.close(Status.OK, new Metadata());
    274       fail("calling a second time should still cause an error");
    275     } catch (IllegalStateException expected) {
    276       // noop
    277     }
    278   }
    279 
    280   @Test
    281   public void serverSendsOne_okFailsOnMissingResponse_unary() {
    282     serverSendsOne_okFailsOnMissingResponse(UNARY_METHOD);
    283   }
    284 
    285   @Test
    286   public void serverSendsOne_okFailsOnMissingResponse_clientStreaming() {
    287     serverSendsOne_okFailsOnMissingResponse(CLIENT_STREAMING_METHOD);
    288   }
    289 
    290   private void serverSendsOne_okFailsOnMissingResponse(
    291       MethodDescriptor<Long, Long> method) {
    292     ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>(
    293         stream,
    294         method,
    295         requestHeaders,
    296         context,
    297         DecompressorRegistry.getDefaultInstance(),
    298         CompressorRegistry.getDefaultInstance(),
    299         serverCallTracer);
    300     serverCall.close(Status.OK, new Metadata());
    301     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
    302     verify(stream, times(1)).cancel(statusCaptor.capture());
    303     assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
    304     assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
    305   }
    306 
    307   @Test
    308   public void serverSendsOne_canErrorWithoutResponse() {
    309     final String description = "test description";
    310     final Status status = Status.RESOURCE_EXHAUSTED.withDescription(description);
    311     final Metadata metadata = new Metadata();
    312     call.close(status, metadata);
    313     verify(stream, times(1)).close(same(status), same(metadata));
    314   }
    315 
    316   @Test
    317   public void isReady() {
    318     when(stream.isReady()).thenReturn(true);
    319 
    320     assertTrue(call.isReady());
    321   }
    322 
    323   @Test
    324   public void getAuthority() {
    325     when(stream.getAuthority()).thenReturn("fooapi.googleapis.com");
    326     assertEquals("fooapi.googleapis.com", call.getAuthority());
    327     verify(stream).getAuthority();
    328   }
    329 
    330   @Test
    331   public void getNullAuthority() {
    332     when(stream.getAuthority()).thenReturn(null);
    333     assertNull(call.getAuthority());
    334     verify(stream).getAuthority();
    335   }
    336 
    337   @Test
    338   public void setMessageCompression() {
    339     call.setMessageCompression(true);
    340 
    341     verify(stream).setMessageCompression(true);
    342   }
    343 
    344   @Test
    345   public void streamListener_halfClosed() {
    346     ServerStreamListenerImpl<Long> streamListener =
    347         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    348 
    349     streamListener.halfClosed();
    350 
    351     verify(callListener).onHalfClose();
    352   }
    353 
    354   @Test
    355   public void streamListener_halfClosed_onlyOnce() {
    356     ServerStreamListenerImpl<Long> streamListener =
    357         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    358     streamListener.halfClosed();
    359     // canceling the call should short circuit future halfClosed() calls.
    360     streamListener.closed(Status.CANCELLED);
    361 
    362     streamListener.halfClosed();
    363 
    364     verify(callListener).onHalfClose();
    365   }
    366 
    367   @Test
    368   public void streamListener_closedOk() {
    369     ServerStreamListenerImpl<Long> streamListener =
    370         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    371 
    372     streamListener.closed(Status.OK);
    373 
    374     verify(callListener).onComplete();
    375     assertTrue(context.isCancelled());
    376     assertNull(context.cancellationCause());
    377   }
    378 
    379   @Test
    380   public void streamListener_closedCancelled() {
    381     ServerStreamListenerImpl<Long> streamListener =
    382         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    383 
    384     streamListener.closed(Status.CANCELLED);
    385 
    386     verify(callListener).onCancel();
    387     assertTrue(context.isCancelled());
    388     assertNull(context.cancellationCause());
    389   }
    390 
    391   @Test
    392   public void streamListener_onReady() {
    393     ServerStreamListenerImpl<Long> streamListener =
    394         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    395 
    396     streamListener.onReady();
    397 
    398     verify(callListener).onReady();
    399   }
    400 
    401   @Test
    402   public void streamListener_onReady_onlyOnce() {
    403     ServerStreamListenerImpl<Long> streamListener =
    404         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    405     streamListener.onReady();
    406     // canceling the call should short circuit future halfClosed() calls.
    407     streamListener.closed(Status.CANCELLED);
    408 
    409     streamListener.onReady();
    410 
    411     verify(callListener).onReady();
    412   }
    413 
    414   @Test
    415   public void streamListener_messageRead() {
    416     ServerStreamListenerImpl<Long> streamListener =
    417         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    418     streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L)));
    419 
    420     verify(callListener).onMessage(1234L);
    421   }
    422 
    423   @Test
    424   public void streamListener_messageRead_onlyOnce() {
    425     ServerStreamListenerImpl<Long> streamListener =
    426         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    427     streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L)));
    428     // canceling the call should short circuit future halfClosed() calls.
    429     streamListener.closed(Status.CANCELLED);
    430 
    431     streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L)));
    432 
    433     verify(callListener).onMessage(1234L);
    434   }
    435 
    436   @Test
    437   public void streamListener_unexpectedRuntimeException() {
    438     ServerStreamListenerImpl<Long> streamListener =
    439         new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context);
    440     doThrow(new RuntimeException("unexpected exception"))
    441         .when(callListener)
    442         .onMessage(any(Long.class));
    443 
    444     InputStream inputStream = UNARY_METHOD.streamRequest(1234L);
    445 
    446     thrown.expect(RuntimeException.class);
    447     thrown.expectMessage("unexpected exception");
    448     streamListener.messagesAvailable(new SingleMessageProducer(inputStream));
    449   }
    450 
    451   private static class LongMarshaller implements Marshaller<Long> {
    452     @Override
    453     public InputStream stream(Long value) {
    454       return new ByteArrayInputStream(value.toString().getBytes(UTF_8));
    455     }
    456 
    457     @Override
    458     public Long parse(InputStream stream) {
    459       try {
    460         return Long.parseLong(CharStreams.toString(new InputStreamReader(stream, UTF_8)));
    461       } catch (Exception e) {
    462         throw new RuntimeException(e);
    463       }
    464     }
    465   }
    466 }
    467