Home | History | Annotate | Download | only in integration
      1 /*
      2  * Copyright 2014 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.testing.integration;
     18 
     19 import com.google.common.base.Preconditions;
     20 import com.google.common.collect.Queues;
     21 import com.google.protobuf.ByteString;
     22 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
     23 import io.grpc.Metadata;
     24 import io.grpc.ServerCall;
     25 import io.grpc.ServerCallHandler;
     26 import io.grpc.ServerInterceptor;
     27 import io.grpc.Status;
     28 import io.grpc.internal.LogExceptionRunnable;
     29 import io.grpc.stub.ServerCallStreamObserver;
     30 import io.grpc.stub.StreamObserver;
     31 import io.grpc.testing.integration.Messages.Payload;
     32 import io.grpc.testing.integration.Messages.PayloadType;
     33 import io.grpc.testing.integration.Messages.ResponseParameters;
     34 import io.grpc.testing.integration.Messages.SimpleRequest;
     35 import io.grpc.testing.integration.Messages.SimpleResponse;
     36 import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
     37 import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
     38 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
     39 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
     40 import java.io.IOException;
     41 import java.io.InputStream;
     42 import java.util.ArrayDeque;
     43 import java.util.Arrays;
     44 import java.util.HashSet;
     45 import java.util.List;
     46 import java.util.Queue;
     47 import java.util.Random;
     48 import java.util.Set;
     49 import java.util.concurrent.Future;
     50 import java.util.concurrent.ScheduledExecutorService;
     51 import java.util.concurrent.TimeUnit;
     52 import javax.annotation.concurrent.GuardedBy;
     53 
     54 /**
     55  * Implementation of the business logic for the TestService. Uses an executor to schedule chunks
     56  * sent in response streams.
     57  */
     58 public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
     59   private static final String UNCOMPRESSABLE_FILE =
     60       "/io/grpc/testing/integration/testdata/uncompressable.bin";
     61   private final Random random = new Random();
     62 
     63   private final ScheduledExecutorService executor;
     64   private final ByteString uncompressableBuffer;
     65   private final ByteString compressableBuffer;
     66 
     67   /**
     68    * Constructs a controller using the given executor for scheduling response stream chunks.
     69    */
     70   public TestServiceImpl(ScheduledExecutorService executor) {
     71     this.executor = executor;
     72     this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
     73     this.uncompressableBuffer = createBufferFromFile(UNCOMPRESSABLE_FILE);
     74   }
     75 
     76   @Override
     77   public void emptyCall(EmptyProtos.Empty empty,
     78       StreamObserver<EmptyProtos.Empty> responseObserver) {
     79     responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
     80     responseObserver.onCompleted();
     81   }
     82 
     83   /**
     84    * Immediately responds with a payload of the type and size specified in the request.
     85    */
     86   @Override
     87   public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
     88     ServerCallStreamObserver<SimpleResponse> obs =
     89         (ServerCallStreamObserver<SimpleResponse>) responseObserver;
     90     SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
     91     try {
     92       if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
     93         obs.setCompression("gzip");
     94       } else {
     95         obs.setCompression("identity");
     96       }
     97     } catch (IllegalArgumentException e) {
     98       obs.onError(Status.UNIMPLEMENTED
     99           .withDescription("compression not supported.")
    100           .withCause(e)
    101           .asRuntimeException());
    102       return;
    103     }
    104 
    105     if (req.getResponseSize() != 0) {
    106       boolean compressable = compressableResponse(req.getResponseType());
    107       ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
    108       // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
    109       // TODO(wonderfly): whether or not this is a good approach needs further discussion.
    110       int offset = random.nextInt(
    111           compressable ? compressableBuffer.size() : uncompressableBuffer.size());
    112       ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
    113       responseBuilder.setPayload(
    114           Payload.newBuilder()
    115               .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
    116               .setBody(payload));
    117     }
    118 
    119     if (req.hasResponseStatus()) {
    120       obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
    121           .withDescription(req.getResponseStatus().getMessage())
    122           .asRuntimeException());
    123       return;
    124     }
    125 
    126     responseObserver.onNext(responseBuilder.build());
    127     responseObserver.onCompleted();
    128   }
    129 
    130   /**
    131    * Given a request that specifies chunk size and interval between responses, creates and schedules
    132    * the response stream.
    133    */
    134   @Override
    135   public void streamingOutputCall(StreamingOutputCallRequest request,
    136       StreamObserver<StreamingOutputCallResponse> responseObserver) {
    137     // Create and start the response dispatcher.
    138     new ResponseDispatcher(responseObserver).enqueue(toChunkQueue(request)).completeInput();
    139   }
    140 
    141   /**
    142    * Waits until we have received all of the request messages and then returns the aggregate payload
    143    * size for all of the received requests.
    144    */
    145   @Override
    146   public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall(
    147       final StreamObserver<Messages.StreamingInputCallResponse> responseObserver) {
    148     return new StreamObserver<StreamingInputCallRequest>() {
    149       private int totalPayloadSize;
    150 
    151       @Override
    152       public void onNext(StreamingInputCallRequest message) {
    153         totalPayloadSize += message.getPayload().getBody().size();
    154       }
    155 
    156       @Override
    157       public void onCompleted() {
    158         responseObserver.onNext(StreamingInputCallResponse.newBuilder()
    159             .setAggregatedPayloadSize(totalPayloadSize).build());
    160         responseObserver.onCompleted();
    161       }
    162 
    163       @Override
    164       public void onError(Throwable cause) {
    165         responseObserver.onError(cause);
    166       }
    167     };
    168   }
    169 
    170   /**
    171    * True bi-directional streaming. Processes requests as they come in. Begins streaming results
    172    * immediately.
    173    */
    174   @Override
    175   public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
    176       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
    177     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
    178     return new StreamObserver<StreamingOutputCallRequest>() {
    179       @Override
    180       public void onNext(StreamingOutputCallRequest request) {
    181         if (request.hasResponseStatus()) {
    182           dispatcher.cancel();
    183           dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
    184               .withDescription(request.getResponseStatus().getMessage())
    185               .asRuntimeException());
    186           return;
    187         }
    188         dispatcher.enqueue(toChunkQueue(request));
    189       }
    190 
    191       @Override
    192       public void onCompleted() {
    193         if (!dispatcher.isCancelled()) {
    194           // Tell the dispatcher that all input has been received.
    195           dispatcher.completeInput();
    196         }
    197       }
    198 
    199       @Override
    200       public void onError(Throwable cause) {
    201         dispatcher.onError(cause);
    202       }
    203     };
    204   }
    205 
    206   /**
    207    * Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be
    208    * received before starting the streaming responses.
    209    */
    210   @Override
    211   public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
    212       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
    213     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
    214     final Queue<Chunk> chunks = new ArrayDeque<Chunk>();
    215     return new StreamObserver<StreamingOutputCallRequest>() {
    216       @Override
    217       public void onNext(StreamingOutputCallRequest request) {
    218         chunks.addAll(toChunkQueue(request));
    219       }
    220 
    221       @Override
    222       public void onCompleted() {
    223         // Dispatch all of the chunks in one shot.
    224         dispatcher.enqueue(chunks).completeInput();
    225       }
    226 
    227       @Override
    228       public void onError(Throwable cause) {
    229         dispatcher.onError(cause);
    230       }
    231     };
    232   }
    233 
    234   /**
    235    * Schedules the dispatch of a queue of chunks. Whenever chunks are added or input is completed,
    236    * the next response chunk is scheduled for delivery to the client. When no more chunks are
    237    * available, the stream is half-closed.
    238    */
    239   private class ResponseDispatcher {
    240     private final Chunk completionChunk = new Chunk(0, 0, 0, false);
    241     private final Queue<Chunk> chunks;
    242     private final StreamObserver<StreamingOutputCallResponse> responseStream;
    243     private boolean scheduled;
    244     @GuardedBy("this") private boolean cancelled;
    245     private Throwable failure;
    246     private Runnable dispatchTask = new Runnable() {
    247       @Override
    248       public void run() {
    249         try {
    250 
    251           // Dispatch the current chunk to the client.
    252           try {
    253             dispatchChunk();
    254           } catch (RuntimeException e) {
    255             // Indicate that nothing is scheduled and re-throw.
    256             synchronized (ResponseDispatcher.this) {
    257               scheduled = false;
    258             }
    259             throw e;
    260           }
    261 
    262           // Schedule the next chunk if there is one.
    263           synchronized (ResponseDispatcher.this) {
    264             // Indicate that nothing is scheduled.
    265             scheduled = false;
    266             scheduleNextChunk();
    267           }
    268         } catch (Throwable t) {
    269           t.printStackTrace();
    270         }
    271       }
    272     };
    273 
    274     /**
    275      * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
    276      * {@link StreamObserver} must be synchronized across threads, no further calls should be made
    277      * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
    278      */
    279     public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
    280       this.chunks = Queues.newLinkedBlockingQueue();
    281       this.responseStream = responseStream;
    282     }
    283 
    284     /**
    285      * Adds the given chunks to the response stream and schedules the next chunk to be delivered if
    286      * needed.
    287      */
    288     public synchronized ResponseDispatcher enqueue(Queue<Chunk> moreChunks) {
    289       assertNotFailed();
    290       chunks.addAll(moreChunks);
    291       scheduleNextChunk();
    292       return this;
    293     }
    294 
    295     /**
    296      * Indicates that the input is completed and the currently enqueued response chunks are all that
    297      * remain to be scheduled for dispatch to the client.
    298      */
    299     public ResponseDispatcher completeInput() {
    300       assertNotFailed();
    301       chunks.add(completionChunk);
    302       scheduleNextChunk();
    303       return this;
    304     }
    305 
    306     /**
    307      * Allows the service to cancel the remaining responses.
    308      */
    309     public synchronized void cancel() {
    310       Preconditions.checkState(!cancelled, "Dispatcher already cancelled");
    311       chunks.clear();
    312       cancelled = true;
    313     }
    314 
    315     public synchronized boolean isCancelled() {
    316       return cancelled;
    317     }
    318 
    319     private synchronized void onError(Throwable cause) {
    320       responseStream.onError(cause);
    321     }
    322 
    323     /**
    324      * Dispatches the current response chunk to the client. This is only called by the executor. At
    325      * any time, a given dispatch task should only be registered with the executor once.
    326      */
    327     private synchronized void dispatchChunk() {
    328       if (cancelled) {
    329         return;
    330       }
    331       try {
    332         // Pop off the next chunk and send it to the client.
    333         Chunk chunk = chunks.remove();
    334         if (chunk == completionChunk) {
    335           responseStream.onCompleted();
    336         } else {
    337           responseStream.onNext(chunk.toResponse());
    338         }
    339       } catch (Throwable e) {
    340         failure = e;
    341         if (Status.fromThrowable(e).getCode() == Status.CANCELLED.getCode()) {
    342           // Stream was cancelled by client, responseStream.onError() might be called already or
    343           // will be called soon by inbounding StreamObserver.
    344           chunks.clear();
    345         } else {
    346           responseStream.onError(e);
    347         }
    348       }
    349     }
    350 
    351     /**
    352      * Schedules the next response chunk to be dispatched. If all input has been received and there
    353      * are no more chunks in the queue, the stream is closed.
    354      */
    355     private void scheduleNextChunk() {
    356       synchronized (this) {
    357         if (scheduled) {
    358           // Dispatch task is already scheduled.
    359           return;
    360         }
    361 
    362         // Schedule the next response chunk if there is one.
    363         Chunk nextChunk = chunks.peek();
    364         if (nextChunk != null) {
    365           scheduled = true;
    366           // TODO(ejona): cancel future if RPC is cancelled
    367           Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),
    368               nextChunk.delayMicroseconds, TimeUnit.MICROSECONDS);
    369           return;
    370         }
    371       }
    372     }
    373 
    374     private void assertNotFailed() {
    375       if (failure != null) {
    376         throw new IllegalStateException("Stream already failed", failure);
    377       }
    378     }
    379   }
    380 
    381   /**
    382    * Breaks down the request and creates a queue of response chunks for the given request.
    383    */
    384   public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) {
    385     Queue<Chunk> chunkQueue = new ArrayDeque<Chunk>();
    386     int offset = 0;
    387     boolean compressable = compressableResponse(request.getResponseType());
    388     for (ResponseParameters params : request.getResponseParametersList()) {
    389       chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize(), compressable));
    390 
    391       // Increment the offset past this chunk.
    392       // Both buffers need to be circular.
    393       offset = (offset + params.getSize())
    394           % (compressable ? compressableBuffer.size() : uncompressableBuffer.size());
    395     }
    396     return chunkQueue;
    397   }
    398 
    399   /**
    400    * A single chunk of a response stream. Contains delivery information for the dispatcher and can
    401    * be converted to a streaming response proto. A chunk just references it's payload in the
    402    * {@link #uncompressableBuffer} array. The payload isn't actually created until {@link
    403    * #toResponse()} is called.
    404    */
    405   private class Chunk {
    406     private final int delayMicroseconds;
    407     private final int offset;
    408     private final int length;
    409     private final boolean compressable;
    410 
    411     public Chunk(int delayMicroseconds, int offset, int length, boolean compressable) {
    412       this.delayMicroseconds = delayMicroseconds;
    413       this.offset = offset;
    414       this.length = length;
    415       this.compressable = compressable;
    416     }
    417 
    418     /**
    419      * Convert this chunk into a streaming response proto.
    420      */
    421     private StreamingOutputCallResponse toResponse() {
    422       StreamingOutputCallResponse.Builder responseBuilder =
    423           StreamingOutputCallResponse.newBuilder();
    424       ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
    425       ByteString payload = generatePayload(dataBuffer, offset, length);
    426       responseBuilder.setPayload(
    427           Payload.newBuilder()
    428               .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
    429               .setBody(payload));
    430       return responseBuilder.build();
    431     }
    432   }
    433 
    434   /**
    435    * Creates a buffer with data read from a file.
    436    */
    437   @SuppressWarnings("Finally") // Not concerned about suppression; expected to be exceedingly rare
    438   private ByteString createBufferFromFile(String fileClassPath) {
    439     ByteString buffer = ByteString.EMPTY;
    440     InputStream inputStream = getClass().getResourceAsStream(fileClassPath);
    441     if (inputStream == null) {
    442       throw new IllegalArgumentException("Unable to locate file on classpath: " + fileClassPath);
    443     }
    444 
    445     try {
    446       buffer = ByteString.readFrom(inputStream);
    447     } catch (IOException e) {
    448       throw new RuntimeException(e);
    449     } finally {
    450       try {
    451         inputStream.close();
    452       } catch (IOException ignorable) {
    453         // ignore
    454       }
    455     }
    456     return buffer;
    457   }
    458 
    459   /**
    460    * Indicates whether or not the response for this type should be compressable. If {@code RANDOM},
    461    * picks a random boolean.
    462    */
    463   private boolean compressableResponse(PayloadType responseType) {
    464     switch (responseType) {
    465       case COMPRESSABLE:
    466         return true;
    467       case RANDOM:
    468         return random.nextBoolean();
    469       case UNCOMPRESSABLE:
    470       default:
    471         return false;
    472     }
    473   }
    474 
    475   /**
    476    * Generates a payload of desired type and size. Reads compressableBuffer or
    477    * uncompressableBuffer as a circular buffer.
    478    */
    479   private ByteString generatePayload(ByteString dataBuffer, int offset, int size) {
    480     ByteString payload = ByteString.EMPTY;
    481     // This offset would never pass the array boundary.
    482     int begin = offset;
    483     int end = 0;
    484     int bytesLeft = size;
    485     while (bytesLeft > 0) {
    486       end = Math.min(begin + bytesLeft, dataBuffer.size());
    487       // ByteString.substring returns the substring from begin, inclusive, to end, exclusive.
    488       payload = payload.concat(dataBuffer.substring(begin, end));
    489       bytesLeft -= (end - begin);
    490       begin = end % dataBuffer.size();
    491     }
    492     return payload;
    493   }
    494 
    495   /** Returns interceptors necessary for full service implementation. */
    496   public static List<ServerInterceptor> interceptors() {
    497     return Arrays.asList(
    498         echoRequestHeadersInterceptor(Util.METADATA_KEY),
    499         echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
    500         echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
    501   }
    502 
    503   /**
    504    * Echo the request headers from a client into response headers and trailers. Useful for
    505    * testing end-to-end metadata propagation.
    506    */
    507   private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
    508     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
    509     return new ServerInterceptor() {
    510       @Override
    511       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
    512           ServerCall<ReqT, RespT> call,
    513           final Metadata requestHeaders,
    514           ServerCallHandler<ReqT, RespT> next) {
    515         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
    516               @Override
    517               public void sendHeaders(Metadata responseHeaders) {
    518                 responseHeaders.merge(requestHeaders, keySet);
    519                 super.sendHeaders(responseHeaders);
    520               }
    521 
    522               @Override
    523               public void close(Status status, Metadata trailers) {
    524                 trailers.merge(requestHeaders, keySet);
    525                 super.close(status, trailers);
    526               }
    527             }, requestHeaders);
    528       }
    529     };
    530   }
    531 
    532   /**
    533    * Echoes request headers with the specified key(s) from a client into response headers only.
    534    */
    535   private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
    536     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
    537     return new ServerInterceptor() {
    538       @Override
    539       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
    540           ServerCall<ReqT, RespT> call,
    541           final Metadata requestHeaders,
    542           ServerCallHandler<ReqT, RespT> next) {
    543         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
    544           @Override
    545           public void sendHeaders(Metadata responseHeaders) {
    546             responseHeaders.merge(requestHeaders, keySet);
    547             super.sendHeaders(responseHeaders);
    548           }
    549 
    550           @Override
    551           public void close(Status status, Metadata trailers) {
    552             super.close(status, trailers);
    553           }
    554         }, requestHeaders);
    555       }
    556     };
    557   }
    558 
    559   /**
    560    * Echoes request headers with the specified key(s) from a client into response trailers only.
    561    */
    562   private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
    563     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
    564     return new ServerInterceptor() {
    565       @Override
    566       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
    567           ServerCall<ReqT, RespT> call,
    568           final Metadata requestHeaders,
    569           ServerCallHandler<ReqT, RespT> next) {
    570         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
    571           @Override
    572           public void sendHeaders(Metadata responseHeaders) {
    573             super.sendHeaders(responseHeaders);
    574           }
    575 
    576           @Override
    577           public void close(Status status, Metadata trailers) {
    578             trailers.merge(requestHeaders, keySet);
    579             super.close(status, trailers);
    580           }
    581         }, requestHeaders);
    582       }
    583     };
    584   }
    585 }
    586