1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertNull; 22 import static org.junit.Assert.assertTrue; 23 import static org.junit.Assert.fail; 24 import static org.mockito.Matchers.any; 25 import static org.mockito.Matchers.isA; 26 import static org.mockito.Matchers.same; 27 import static org.mockito.Mockito.doThrow; 28 import static org.mockito.Mockito.never; 29 import static org.mockito.Mockito.times; 30 import static org.mockito.Mockito.verify; 31 import static org.mockito.Mockito.when; 32 33 import com.google.common.io.CharStreams; 34 import io.grpc.CompressorRegistry; 35 import io.grpc.Context; 36 import io.grpc.DecompressorRegistry; 37 import io.grpc.InternalChannelz.ServerStats; 38 import io.grpc.InternalChannelz.ServerStats.Builder; 39 import io.grpc.Metadata; 40 import io.grpc.MethodDescriptor; 41 import io.grpc.MethodDescriptor.Marshaller; 42 import io.grpc.MethodDescriptor.MethodType; 43 import io.grpc.ServerCall; 44 import io.grpc.Status; 45 import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; 46 import io.grpc.internal.testing.SingleMessageProducer; 47 import java.io.ByteArrayInputStream; 48 import java.io.InputStream; 49 import java.io.InputStreamReader; 50 import org.junit.Before; 51 import org.junit.Rule; 52 import org.junit.Test; 53 import org.junit.rules.ExpectedException; 54 import org.junit.runner.RunWith; 55 import org.junit.runners.JUnit4; 56 import org.mockito.ArgumentCaptor; 57 import org.mockito.Mock; 58 import org.mockito.MockitoAnnotations; 59 60 @RunWith(JUnit4.class) 61 public class ServerCallImplTest { 62 @Rule public final ExpectedException thrown = ExpectedException.none(); 63 @Mock private ServerStream stream; 64 @Mock private ServerCall.Listener<Long> callListener; 65 66 private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); 67 private ServerCallImpl<Long, Long> call; 68 private Context.CancellableContext context; 69 70 private static final MethodDescriptor<Long, Long> UNARY_METHOD = 71 MethodDescriptor.<Long, Long>newBuilder() 72 .setType(MethodType.UNARY) 73 .setFullMethodName("service/method") 74 .setRequestMarshaller(new LongMarshaller()) 75 .setResponseMarshaller(new LongMarshaller()) 76 .build(); 77 78 private static final MethodDescriptor<Long, Long> CLIENT_STREAMING_METHOD = 79 MethodDescriptor.<Long, Long>newBuilder() 80 .setType(MethodType.UNARY) 81 .setFullMethodName("service/method") 82 .setRequestMarshaller(new LongMarshaller()) 83 .setResponseMarshaller(new LongMarshaller()) 84 .build(); 85 86 private final Metadata requestHeaders = new Metadata(); 87 88 @Before 89 public void setUp() { 90 MockitoAnnotations.initMocks(this); 91 context = Context.ROOT.withCancellation(); 92 call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context, 93 DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), 94 serverCallTracer); 95 } 96 97 @Test 98 public void callTracer_success() { 99 callTracer0(Status.OK); 100 } 101 102 @Test 103 public void callTracer_failure() { 104 callTracer0(Status.UNKNOWN); 105 } 106 107 private void callTracer0(Status status) { 108 CallTracer tracer = CallTracer.getDefaultFactory().create(); 109 Builder beforeBuilder = new Builder(); 110 tracer.updateBuilder(beforeBuilder); 111 ServerStats before = beforeBuilder.build(); 112 assertEquals(0, before.callsStarted); 113 assertEquals(0, before.lastCallStartedNanos); 114 115 call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context, 116 DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), 117 tracer); 118 119 // required boilerplate 120 call.sendHeaders(new Metadata()); 121 call.sendMessage(123L); 122 // end: required boilerplate 123 124 call.close(status, new Metadata()); 125 Builder afterBuilder = new Builder(); 126 tracer.updateBuilder(afterBuilder); 127 ServerStats after = afterBuilder.build(); 128 assertEquals(1, after.callsStarted); 129 if (status.isOk()) { 130 assertEquals(1, after.callsSucceeded); 131 } else { 132 assertEquals(1, after.callsFailed); 133 } 134 } 135 136 @Test 137 public void request() { 138 call.request(10); 139 140 verify(stream).request(10); 141 } 142 143 @Test 144 public void sendHeader_firstCall() { 145 Metadata headers = new Metadata(); 146 147 call.sendHeaders(headers); 148 149 verify(stream).writeHeaders(headers); 150 } 151 152 @Test 153 public void sendHeader_failsOnSecondCall() { 154 call.sendHeaders(new Metadata()); 155 thrown.expect(IllegalStateException.class); 156 thrown.expectMessage("sendHeaders has already been called"); 157 158 call.sendHeaders(new Metadata()); 159 } 160 161 @Test 162 public void sendHeader_failsOnClosed() { 163 call.close(Status.CANCELLED, new Metadata()); 164 165 thrown.expect(IllegalStateException.class); 166 thrown.expectMessage("call is closed"); 167 168 call.sendHeaders(new Metadata()); 169 } 170 171 @Test 172 public void sendMessage() { 173 call.sendHeaders(new Metadata()); 174 call.sendMessage(1234L); 175 176 verify(stream).writeMessage(isA(InputStream.class)); 177 verify(stream).flush(); 178 } 179 180 @Test 181 public void sendMessage_failsOnClosed() { 182 call.sendHeaders(new Metadata()); 183 call.close(Status.CANCELLED, new Metadata()); 184 185 thrown.expect(IllegalStateException.class); 186 thrown.expectMessage("call is closed"); 187 188 call.sendMessage(1234L); 189 } 190 191 @Test 192 public void sendMessage_failsIfheadersUnsent() { 193 thrown.expect(IllegalStateException.class); 194 thrown.expectMessage("sendHeaders has not been called"); 195 196 call.sendMessage(1234L); 197 } 198 199 @Test 200 public void sendMessage_closesOnFailure() { 201 call.sendHeaders(new Metadata()); 202 doThrow(new RuntimeException("bad")).when(stream).writeMessage(isA(InputStream.class)); 203 204 call.sendMessage(1234L); 205 206 verify(stream).close(isA(Status.class), isA(Metadata.class)); 207 } 208 209 @Test 210 public void sendMessage_serverSendsOne_closeOnSecondCall_unary() { 211 sendMessage_serverSendsOne_closeOnSecondCall(UNARY_METHOD); 212 } 213 214 @Test 215 public void sendMessage_serverSendsOne_closeOnSecondCall_clientStreaming() { 216 sendMessage_serverSendsOne_closeOnSecondCall(CLIENT_STREAMING_METHOD); 217 } 218 219 private void sendMessage_serverSendsOne_closeOnSecondCall( 220 MethodDescriptor<Long, Long> method) { 221 ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( 222 stream, 223 method, 224 requestHeaders, 225 context, 226 DecompressorRegistry.getDefaultInstance(), 227 CompressorRegistry.getDefaultInstance(), 228 serverCallTracer); 229 serverCall.sendHeaders(new Metadata()); 230 serverCall.sendMessage(1L); 231 verify(stream, times(1)).writeMessage(any(InputStream.class)); 232 verify(stream, never()).close(any(Status.class), any(Metadata.class)); 233 234 // trying to send a second message causes gRPC to close the underlying stream 235 serverCall.sendMessage(1L); 236 verify(stream, times(1)).writeMessage(any(InputStream.class)); 237 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 238 verify(stream, times(1)).cancel(statusCaptor.capture()); 239 assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); 240 assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription()); 241 } 242 243 @Test 244 public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_unary() { 245 sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(UNARY_METHOD); 246 } 247 248 @Test 249 public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_clientStreaming() { 250 sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion(CLIENT_STREAMING_METHOD); 251 } 252 253 private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion( 254 MethodDescriptor<Long, Long> method) { 255 ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( 256 stream, 257 method, 258 requestHeaders, 259 context, 260 DecompressorRegistry.getDefaultInstance(), 261 CompressorRegistry.getDefaultInstance(), 262 serverCallTracer); 263 serverCall.sendHeaders(new Metadata()); 264 serverCall.sendMessage(1L); 265 serverCall.sendMessage(1L); 266 verify(stream, times(1)).writeMessage(any(InputStream.class)); 267 verify(stream, times(1)).cancel(any(Status.class)); 268 269 // App runs to completion but everything is ignored 270 serverCall.sendMessage(1L); 271 serverCall.close(Status.OK, new Metadata()); 272 try { 273 serverCall.close(Status.OK, new Metadata()); 274 fail("calling a second time should still cause an error"); 275 } catch (IllegalStateException expected) { 276 // noop 277 } 278 } 279 280 @Test 281 public void serverSendsOne_okFailsOnMissingResponse_unary() { 282 serverSendsOne_okFailsOnMissingResponse(UNARY_METHOD); 283 } 284 285 @Test 286 public void serverSendsOne_okFailsOnMissingResponse_clientStreaming() { 287 serverSendsOne_okFailsOnMissingResponse(CLIENT_STREAMING_METHOD); 288 } 289 290 private void serverSendsOne_okFailsOnMissingResponse( 291 MethodDescriptor<Long, Long> method) { 292 ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( 293 stream, 294 method, 295 requestHeaders, 296 context, 297 DecompressorRegistry.getDefaultInstance(), 298 CompressorRegistry.getDefaultInstance(), 299 serverCallTracer); 300 serverCall.close(Status.OK, new Metadata()); 301 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 302 verify(stream, times(1)).cancel(statusCaptor.capture()); 303 assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); 304 assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription()); 305 } 306 307 @Test 308 public void serverSendsOne_canErrorWithoutResponse() { 309 final String description = "test description"; 310 final Status status = Status.RESOURCE_EXHAUSTED.withDescription(description); 311 final Metadata metadata = new Metadata(); 312 call.close(status, metadata); 313 verify(stream, times(1)).close(same(status), same(metadata)); 314 } 315 316 @Test 317 public void isReady() { 318 when(stream.isReady()).thenReturn(true); 319 320 assertTrue(call.isReady()); 321 } 322 323 @Test 324 public void getAuthority() { 325 when(stream.getAuthority()).thenReturn("fooapi.googleapis.com"); 326 assertEquals("fooapi.googleapis.com", call.getAuthority()); 327 verify(stream).getAuthority(); 328 } 329 330 @Test 331 public void getNullAuthority() { 332 when(stream.getAuthority()).thenReturn(null); 333 assertNull(call.getAuthority()); 334 verify(stream).getAuthority(); 335 } 336 337 @Test 338 public void setMessageCompression() { 339 call.setMessageCompression(true); 340 341 verify(stream).setMessageCompression(true); 342 } 343 344 @Test 345 public void streamListener_halfClosed() { 346 ServerStreamListenerImpl<Long> streamListener = 347 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 348 349 streamListener.halfClosed(); 350 351 verify(callListener).onHalfClose(); 352 } 353 354 @Test 355 public void streamListener_halfClosed_onlyOnce() { 356 ServerStreamListenerImpl<Long> streamListener = 357 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 358 streamListener.halfClosed(); 359 // canceling the call should short circuit future halfClosed() calls. 360 streamListener.closed(Status.CANCELLED); 361 362 streamListener.halfClosed(); 363 364 verify(callListener).onHalfClose(); 365 } 366 367 @Test 368 public void streamListener_closedOk() { 369 ServerStreamListenerImpl<Long> streamListener = 370 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 371 372 streamListener.closed(Status.OK); 373 374 verify(callListener).onComplete(); 375 assertTrue(context.isCancelled()); 376 assertNull(context.cancellationCause()); 377 } 378 379 @Test 380 public void streamListener_closedCancelled() { 381 ServerStreamListenerImpl<Long> streamListener = 382 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 383 384 streamListener.closed(Status.CANCELLED); 385 386 verify(callListener).onCancel(); 387 assertTrue(context.isCancelled()); 388 assertNull(context.cancellationCause()); 389 } 390 391 @Test 392 public void streamListener_onReady() { 393 ServerStreamListenerImpl<Long> streamListener = 394 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 395 396 streamListener.onReady(); 397 398 verify(callListener).onReady(); 399 } 400 401 @Test 402 public void streamListener_onReady_onlyOnce() { 403 ServerStreamListenerImpl<Long> streamListener = 404 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 405 streamListener.onReady(); 406 // canceling the call should short circuit future halfClosed() calls. 407 streamListener.closed(Status.CANCELLED); 408 409 streamListener.onReady(); 410 411 verify(callListener).onReady(); 412 } 413 414 @Test 415 public void streamListener_messageRead() { 416 ServerStreamListenerImpl<Long> streamListener = 417 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 418 streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L))); 419 420 verify(callListener).onMessage(1234L); 421 } 422 423 @Test 424 public void streamListener_messageRead_onlyOnce() { 425 ServerStreamListenerImpl<Long> streamListener = 426 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 427 streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L))); 428 // canceling the call should short circuit future halfClosed() calls. 429 streamListener.closed(Status.CANCELLED); 430 431 streamListener.messagesAvailable(new SingleMessageProducer(UNARY_METHOD.streamRequest(1234L))); 432 433 verify(callListener).onMessage(1234L); 434 } 435 436 @Test 437 public void streamListener_unexpectedRuntimeException() { 438 ServerStreamListenerImpl<Long> streamListener = 439 new ServerCallImpl.ServerStreamListenerImpl<Long>(call, callListener, context); 440 doThrow(new RuntimeException("unexpected exception")) 441 .when(callListener) 442 .onMessage(any(Long.class)); 443 444 InputStream inputStream = UNARY_METHOD.streamRequest(1234L); 445 446 thrown.expect(RuntimeException.class); 447 thrown.expectMessage("unexpected exception"); 448 streamListener.messagesAvailable(new SingleMessageProducer(inputStream)); 449 } 450 451 private static class LongMarshaller implements Marshaller<Long> { 452 @Override 453 public InputStream stream(Long value) { 454 return new ByteArrayInputStream(value.toString().getBytes(UTF_8)); 455 } 456 457 @Override 458 public Long parse(InputStream stream) { 459 try { 460 return Long.parseLong(CharStreams.toString(new InputStreamReader(stream, UTF_8))); 461 } catch (Exception e) { 462 throw new RuntimeException(e); 463 } 464 } 465 } 466 } 467