Home | History | Annotate | Download | only in cronet
      1 /*
      2  * Copyright 2016 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.cronet;
     18 
     19 import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
     20 import static io.grpc.internal.GrpcUtil.TE_HEADER;
     21 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
     22 
     23 // TODO(ericgribkoff): Consider changing from android.util.Log to java logging.
     24 import android.util.Log;
     25 import com.google.common.annotations.VisibleForTesting;
     26 import com.google.common.base.Preconditions;
     27 import com.google.common.io.BaseEncoding;
     28 import io.grpc.Attributes;
     29 import io.grpc.CallOptions;
     30 import io.grpc.InternalMetadata;
     31 import io.grpc.Metadata;
     32 import io.grpc.MethodDescriptor;
     33 import io.grpc.Status;
     34 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
     35 import io.grpc.internal.AbstractClientStream;
     36 import io.grpc.internal.GrpcUtil;
     37 import io.grpc.internal.Http2ClientStreamTransportState;
     38 import io.grpc.internal.ReadableBuffers;
     39 import io.grpc.internal.StatsTraceContext;
     40 import io.grpc.internal.TransportFrameUtil;
     41 import io.grpc.internal.TransportTracer;
     42 import io.grpc.internal.WritableBuffer;
     43 import java.nio.ByteBuffer;
     44 import java.nio.charset.Charset;
     45 import java.util.ArrayList;
     46 import java.util.Collection;
     47 import java.util.LinkedList;
     48 import java.util.List;
     49 import java.util.Map;
     50 import java.util.Queue;
     51 import java.util.concurrent.Executor;
     52 import javax.annotation.Nullable;
     53 import javax.annotation.concurrent.GuardedBy;
     54 import org.chromium.net.BidirectionalStream;
     55 import org.chromium.net.CronetException;
     56 import org.chromium.net.ExperimentalBidirectionalStream;
     57 import org.chromium.net.UrlResponseInfo;
     58 
     59 /**
     60  * Client stream for the cronet transport.
     61  */
     62 class CronetClientStream extends AbstractClientStream {
     63   private static final int READ_BUFFER_CAPACITY = 4 * 1024;
     64   private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
     65   private static final String LOG_TAG = "grpc-java-cronet";
     66   private final String url;
     67   private final String userAgent;
     68   private final StatsTraceContext statsTraceCtx;
     69   private final Executor executor;
     70   private final Metadata headers;
     71   private final CronetClientTransport transport;
     72   private final Runnable startCallback;
     73   @VisibleForTesting
     74   final boolean idempotent;
     75   private BidirectionalStream stream;
     76   private final boolean delayRequestHeader;
     77   private final Object annotation;
     78   private final Collection<Object> annotations;
     79   private final TransportState state;
     80   private final Sink sink = new Sink();
     81   private StreamBuilderFactory streamFactory;
     82 
     83   CronetClientStream(
     84       final String url,
     85       @Nullable String userAgent,
     86       Executor executor,
     87       final Metadata headers,
     88       CronetClientTransport transport,
     89       Runnable startCallback,
     90       Object lock,
     91       int maxMessageSize,
     92       boolean alwaysUsePut,
     93       MethodDescriptor<?, ?> method,
     94       StatsTraceContext statsTraceCtx,
     95       CallOptions callOptions,
     96       TransportTracer transportTracer) {
     97     super(
     98         new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers,
     99         method.isSafe());
    100     this.url = Preconditions.checkNotNull(url, "url");
    101     this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
    102     this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
    103     this.executor = Preconditions.checkNotNull(executor, "executor");
    104     this.headers = Preconditions.checkNotNull(headers, "headers");
    105     this.transport = Preconditions.checkNotNull(transport, "transport");
    106     this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback");
    107     this.idempotent = method.isIdempotent() || alwaysUsePut;
    108     // Only delay flushing header for unary rpcs.
    109     this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
    110     this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY);
    111     this.annotations = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATIONS_KEY);
    112     this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer);
    113   }
    114 
    115   @Override
    116   protected TransportState transportState() {
    117     return state;
    118   }
    119 
    120   @Override
    121   protected Sink abstractClientStreamSink() {
    122     return sink;
    123   }
    124 
    125   @Override
    126   public void setAuthority(String authority) {
    127     throw new UnsupportedOperationException("Cronet does not support overriding authority");
    128   }
    129 
    130   class Sink implements AbstractClientStream.Sink {
    131     @Override
    132     public void writeHeaders(Metadata metadata, byte[] payload) {
    133       startCallback.run();
    134 
    135       BidirectionalStreamCallback callback = new BidirectionalStreamCallback();
    136       String path = url;
    137       if (payload != null) {
    138         path += "?" + BaseEncoding.base64().encode(payload);
    139       }
    140       BidirectionalStream.Builder builder =
    141           streamFactory.newBidirectionalStreamBuilder(path, callback, executor);
    142       if (payload != null) {
    143         builder.setHttpMethod("GET");
    144       } else if (idempotent) {
    145         builder.setHttpMethod("PUT");
    146       }
    147       if (delayRequestHeader) {
    148         builder.delayRequestHeadersUntilFirstFlush(true);
    149       }
    150       if (annotation != null) {
    151         ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation);
    152       }
    153       if (annotations != null) {
    154         for (Object o : annotations) {
    155           ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(o);
    156         }
    157       }
    158       setGrpcHeaders(builder);
    159       stream = builder.build();
    160       stream.start();
    161     }
    162 
    163     @Override
    164     public void writeFrame(
    165         WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages) {
    166       synchronized (state.lock) {
    167         if (state.cancelSent) {
    168           return;
    169         }
    170         ByteBuffer byteBuffer;
    171         if (buffer != null) {
    172           byteBuffer = ((CronetWritableBuffer) buffer).buffer();
    173           byteBuffer.flip();
    174         } else {
    175           byteBuffer = EMPTY_BUFFER;
    176         }
    177         onSendingBytes(byteBuffer.remaining());
    178         if (!state.streamReady) {
    179           state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush));
    180         } else {
    181           streamWrite(byteBuffer, endOfStream, flush);
    182         }
    183       }
    184     }
    185 
    186     @Override
    187     public void request(final int numMessages) {
    188       synchronized (state.lock) {
    189         state.requestMessagesFromDeframer(numMessages);
    190       }
    191     }
    192 
    193     @Override
    194     public void cancel(Status reason) {
    195       synchronized (state.lock) {
    196         if (state.cancelSent) {
    197           return;
    198         }
    199         state.cancelSent = true;
    200         state.cancelReason = reason;
    201         state.clearPendingData();
    202         if (stream != null) {
    203           // Will report stream finish when BidirectionalStreamCallback.onCanceled is called.
    204           stream.cancel();
    205         } else {
    206           transport.finishStream(CronetClientStream.this, reason);
    207         }
    208       }
    209     }
    210   }
    211 
    212   class TransportState extends Http2ClientStreamTransportState {
    213     private final Object lock;
    214     @GuardedBy("lock")
    215     private Queue<PendingData> pendingData = new LinkedList<PendingData>();
    216     @GuardedBy("lock")
    217     private boolean streamReady;
    218     @GuardedBy("lock")
    219     private boolean cancelSent = false;
    220     @GuardedBy("lock")
    221     private int bytesPendingProcess;
    222     @GuardedBy("lock")
    223     private Status cancelReason;
    224     @GuardedBy("lock")
    225     private boolean readClosed;
    226     @GuardedBy("lock")
    227     private boolean firstWriteComplete;
    228 
    229     public TransportState(
    230         int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock,
    231         TransportTracer transportTracer) {
    232       super(maxMessageSize, statsTraceCtx, transportTracer);
    233       this.lock = Preconditions.checkNotNull(lock, "lock");
    234     }
    235 
    236     @GuardedBy("lock")
    237     public void start(StreamBuilderFactory factory) {
    238       streamFactory = factory;
    239     }
    240 
    241     @GuardedBy("lock")
    242     @Override
    243     protected void onStreamAllocated() {
    244       super.onStreamAllocated();
    245     }
    246 
    247     @GuardedBy("lock")
    248     @Override
    249     protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
    250       stream.cancel();
    251       transportReportStatus(status, stopDelivery, trailers);
    252     }
    253 
    254     @GuardedBy("lock")
    255     @Override
    256     public void deframeFailed(Throwable cause) {
    257       http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
    258     }
    259 
    260     @Override
    261     public void runOnTransportThread(final Runnable r) {
    262       synchronized (lock) {
    263         r.run();
    264       }
    265     }
    266 
    267     @GuardedBy("lock")
    268     @Override
    269     public void bytesRead(int processedBytes) {
    270       bytesPendingProcess -= processedBytes;
    271       if (bytesPendingProcess == 0 && !readClosed) {
    272         if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    273           Log.v(LOG_TAG, "BidirectionalStream.read");
    274         }
    275         stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
    276       }
    277     }
    278 
    279     @GuardedBy("lock")
    280     private void transportHeadersReceived(Metadata metadata, boolean endOfStream) {
    281       if (endOfStream) {
    282         transportTrailersReceived(metadata);
    283       } else {
    284         transportHeadersReceived(metadata);
    285       }
    286     }
    287 
    288     @GuardedBy("lock")
    289     private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) {
    290       bytesPendingProcess += buffer.remaining();
    291       super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream);
    292     }
    293 
    294     @GuardedBy("lock")
    295     private void clearPendingData() {
    296       for (PendingData data : pendingData) {
    297         data.buffer.clear();
    298       }
    299       pendingData.clear();
    300     }
    301 
    302     @GuardedBy("lock")
    303     private void enqueuePendingData(PendingData data) {
    304       pendingData.add(data);
    305     }
    306 
    307     @GuardedBy("lock")
    308     private void writeAllPendingData() {
    309       for (PendingData data : pendingData) {
    310         streamWrite(data.buffer, data.endOfStream, data.flush);
    311       }
    312       pendingData.clear();
    313     }
    314   }
    315 
    316   // TODO(ericgribkoff): move header related method to a common place like GrpcUtil.
    317   private static boolean isApplicationHeader(String key) {
    318     // Don't allow reserved non HTTP/2 pseudo headers to be added
    319     // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character.
    320     return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key)
    321         && !USER_AGENT_KEY.name().equalsIgnoreCase(key)
    322         && !TE_HEADER.name().equalsIgnoreCase(key);
    323   }
    324 
    325   private void setGrpcHeaders(BidirectionalStream.Builder builder) {
    326     // Psuedo-headers are set by cronet.
    327     // All non-pseudo headers must come after pseudo headers.
    328     // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed.
    329     builder.addHeader(USER_AGENT_KEY.name(), userAgent);
    330     builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
    331     builder.addHeader("te", GrpcUtil.TE_TRAILERS);
    332 
    333     // Now add any application-provided headers.
    334     // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between
    335     // String and byte array.
    336     byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
    337     for (int i = 0; i < serializedHeaders.length; i += 2) {
    338       String key = new String(serializedHeaders[i], Charset.forName("UTF-8"));
    339       // TODO(ericgribkoff): log an error or throw an exception
    340       if (isApplicationHeader(key)) {
    341         String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8"));
    342         builder.addHeader(key, value);
    343       }
    344     }
    345   }
    346 
    347   private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) {
    348     if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    349       Log.v(LOG_TAG, "BidirectionalStream.write");
    350     }
    351     stream.write(buffer, endOfStream);
    352     if (flush) {
    353       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    354         Log.v(LOG_TAG, "BidirectionalStream.flush");
    355       }
    356       stream.flush();
    357     }
    358   }
    359 
    360   private void finishStream(Status status) {
    361     transport.finishStream(this, status);
    362   }
    363 
    364   @Override
    365   public Attributes getAttributes() {
    366     return Attributes.EMPTY;
    367   }
    368 
    369   class BidirectionalStreamCallback extends BidirectionalStream.Callback {
    370     private List<Map.Entry<String, String>> trailerList;
    371 
    372     @Override
    373     public void onStreamReady(BidirectionalStream stream) {
    374       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    375         Log.v(LOG_TAG, "onStreamReady");
    376       }
    377       synchronized (state.lock) {
    378         // Now that the stream is ready, call the listener's onReady callback if
    379         // appropriate.
    380         state.onStreamAllocated();
    381         state.streamReady = true;
    382         state.writeAllPendingData();
    383       }
    384     }
    385 
    386     @Override
    387     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
    388       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    389         Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList());
    390         Log.v(LOG_TAG, "BidirectionalStream.read");
    391       }
    392       reportHeaders(info.getAllHeadersAsList(), false);
    393       stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
    394     }
    395 
    396     @Override
    397     public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
    398         ByteBuffer buffer, boolean endOfStream) {
    399       buffer.flip();
    400       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    401         Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining());
    402       }
    403 
    404       synchronized (state.lock) {
    405         state.readClosed = endOfStream;
    406         // The endOfStream in gRPC has a different meaning so we always call transportDataReceived
    407         // with endOfStream=false.
    408         if (buffer.remaining() != 0) {
    409           state.transportDataReceived(buffer, false);
    410         }
    411       }
    412       if (endOfStream && trailerList != null) {
    413         // Process trailers if we have already received any.
    414         reportHeaders(trailerList, true);
    415       }
    416     }
    417 
    418     @Override
    419     public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
    420         ByteBuffer buffer, boolean endOfStream) {
    421       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    422         Log.v(LOG_TAG, "onWriteCompleted");
    423       }
    424       synchronized (state.lock) {
    425         if (!state.firstWriteComplete) {
    426           // Cronet API doesn't notify when headers are written to wire, but it occurs before first
    427           // onWriteCompleted callback.
    428           state.firstWriteComplete = true;
    429           statsTraceCtx.clientOutboundHeaders();
    430         }
    431         state.onSentBytes(buffer.position());
    432       }
    433     }
    434 
    435     @Override
    436     public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
    437         UrlResponseInfo.HeaderBlock trailers) {
    438       processTrailers(trailers.getAsList());
    439     }
    440 
    441     // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be
    442     // mocked.
    443     @VisibleForTesting
    444     void processTrailers(List<Map.Entry<String, String>> trailerList) {
    445       this.trailerList = trailerList;
    446       boolean readClosed;
    447       synchronized (state.lock) {
    448         readClosed = state.readClosed;
    449       }
    450       if (readClosed) {
    451         // There's no pending onReadCompleted callback so we can report trailers now.
    452         reportHeaders(trailerList, true);
    453       }
    454       // Otherwise report trailers in onReadCompleted, or onSucceeded.
    455       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    456         Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString());
    457       }
    458     }
    459 
    460     @Override
    461     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
    462       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    463         Log.v(LOG_TAG, "onSucceeded");
    464       }
    465 
    466       if (!haveTrailersBeenReported()) {
    467         if (trailerList != null) {
    468           reportHeaders(trailerList, true);
    469         } else if (info != null) {
    470           reportHeaders(info.getAllHeadersAsList(), true);
    471         } else {
    472           throw new AssertionError("No response header or trailer");
    473         }
    474       }
    475       finishStream(toGrpcStatus(info));
    476     }
    477 
    478     @Override
    479     public void onFailed(BidirectionalStream stream, UrlResponseInfo info,
    480         CronetException error) {
    481       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    482         Log.v(LOG_TAG, "onFailed");
    483       }
    484       finishStream(Status.UNAVAILABLE.withCause(error));
    485     }
    486 
    487     @Override
    488     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
    489       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
    490         Log.v(LOG_TAG, "onCanceled");
    491       }
    492       Status status;
    493       synchronized (state.lock) {
    494         if (state.cancelReason != null) {
    495           status = state.cancelReason;
    496         } else if (info != null) {
    497           status = toGrpcStatus(info);
    498         } else {
    499           status = Status.CANCELLED.withDescription("stream cancelled without reason");
    500         }
    501       }
    502       finishStream(status);
    503     }
    504 
    505     private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) {
    506       // TODO(ericgribkoff): create new utility methods to eliminate all these conversions
    507       List<String> headerList = new ArrayList<>();
    508       for (Map.Entry<String, String> entry : headers) {
    509         headerList.add(entry.getKey());
    510         headerList.add(entry.getValue());
    511       }
    512 
    513       byte[][] headerValues = new byte[headerList.size()][];
    514       for (int i = 0; i < headerList.size(); i += 2) {
    515         headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8"));
    516         headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8"));
    517       }
    518       Metadata metadata =
    519           InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues));
    520       synchronized (state.lock) {
    521         // There's no pending onReadCompleted callback so we can report trailers now.
    522         state.transportHeadersReceived(metadata, endOfStream);
    523       }
    524     }
    525 
    526     private boolean haveTrailersBeenReported() {
    527       synchronized (state.lock) {
    528         return trailerList != null && state.readClosed;
    529       }
    530     }
    531 
    532     private Status toGrpcStatus(UrlResponseInfo info) {
    533       return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode());
    534     }
    535   }
    536 
    537   private static class PendingData {
    538     ByteBuffer buffer;
    539     boolean endOfStream;
    540     boolean flush;
    541 
    542     PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) {
    543       this.buffer = buffer;
    544       this.endOfStream = endOfStream;
    545       this.flush = flush;
    546     }
    547   }
    548 }
    549