Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2014 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static com.google.common.base.Preconditions.checkNotNull;
     21 import static com.google.common.base.Preconditions.checkState;
     22 import static java.lang.Math.min;
     23 
     24 import io.grpc.Codec;
     25 import io.grpc.Compressor;
     26 import io.grpc.Drainable;
     27 import io.grpc.KnownLength;
     28 import io.grpc.Status;
     29 import java.io.ByteArrayInputStream;
     30 import java.io.IOException;
     31 import java.io.InputStream;
     32 import java.io.OutputStream;
     33 import java.nio.ByteBuffer;
     34 import java.util.ArrayList;
     35 import java.util.List;
     36 import javax.annotation.Nullable;
     37 
     38 /**
     39  * Encodes gRPC messages to be delivered via the transport layer which implements {@link
     40  * MessageFramer.Sink}.
     41  */
     42 public class MessageFramer implements Framer {
     43 
     44   private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;
     45 
     46   /**
     47    * Sink implemented by the transport layer to receive frames and forward them to their
     48    * destination.
     49    */
     50   public interface Sink {
     51     /**
     52      * Delivers a frame via the transport.
     53      *
     54      * @param frame a non-empty buffer to deliver or {@code null} if the framer is being
     55      *              closed and there is no data to deliver.
     56      * @param endOfStream whether the frame is the last one for the GRPC stream
     57      * @param flush {@code true} if more data may not be arriving soon
     58      * @param numMessages the number of messages that this series of frames represents
     59      */
     60     void deliverFrame(
     61         @Nullable WritableBuffer frame,
     62         boolean endOfStream,
     63         boolean flush,
     64         int numMessages);
     65   }
     66 
     67   private static final int HEADER_LENGTH = 5;
     68   private static final byte UNCOMPRESSED = 0;
     69   private static final byte COMPRESSED = 1;
     70 
     71   private final Sink sink;
     72   // effectively final.  Can only be set once.
     73   private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
     74   private WritableBuffer buffer;
     75   private Compressor compressor = Codec.Identity.NONE;
     76   private boolean messageCompression = true;
     77   private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
     78   private final byte[] headerScratch = new byte[HEADER_LENGTH];
     79   private final WritableBufferAllocator bufferAllocator;
     80   private final StatsTraceContext statsTraceCtx;
     81   // transportTracer is nullable until it is integrated with client transports
     82   private boolean closed;
     83 
     84   // Tracing and stats-related states
     85   private int messagesBuffered;
     86   private int currentMessageSeqNo = -1;
     87   private long currentMessageWireSize;
     88 
     89   /**
     90    * Creates a {@code MessageFramer}.
     91    *
     92    * @param sink the sink used to deliver frames to the transport
     93    * @param bufferAllocator allocates buffers that the transport can commit to the wire.
     94    */
     95   public MessageFramer(
     96       Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
     97     this.sink = checkNotNull(sink, "sink");
     98     this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator");
     99     this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
    100   }
    101 
    102   @Override
    103   public MessageFramer setCompressor(Compressor compressor) {
    104     this.compressor = checkNotNull(compressor, "Can't pass an empty compressor");
    105     return this;
    106   }
    107 
    108   @Override
    109   public MessageFramer setMessageCompression(boolean enable) {
    110     messageCompression = enable;
    111     return this;
    112   }
    113 
    114   @Override
    115   public void setMaxOutboundMessageSize(int maxSize) {
    116     checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
    117     maxOutboundMessageSize = maxSize;
    118   }
    119 
    120   /**
    121    * Writes out a payload message.
    122    *
    123    * @param message contains the message to be written out. It will be completely consumed.
    124    */
    125   @Override
    126   public void writePayload(InputStream message) {
    127     verifyNotClosed();
    128     messagesBuffered++;
    129     currentMessageSeqNo++;
    130     currentMessageWireSize = 0;
    131     statsTraceCtx.outboundMessage(currentMessageSeqNo);
    132     boolean compressed = messageCompression && compressor != Codec.Identity.NONE;
    133     int written = -1;
    134     int messageLength = -2;
    135     try {
    136       messageLength = getKnownLength(message);
    137       if (messageLength != 0 && compressed) {
    138         written = writeCompressed(message, messageLength);
    139       } else {
    140         written = writeUncompressed(message, messageLength);
    141       }
    142     } catch (IOException e) {
    143       // This should not be possible, since sink#deliverFrame doesn't throw.
    144       throw Status.INTERNAL
    145           .withDescription("Failed to frame message")
    146           .withCause(e)
    147           .asRuntimeException();
    148     } catch (RuntimeException e) {
    149       throw Status.INTERNAL
    150           .withDescription("Failed to frame message")
    151           .withCause(e)
    152           .asRuntimeException();
    153     }
    154 
    155     if (messageLength != -1 && written != messageLength) {
    156       String err = String.format("Message length inaccurate %s != %s", written, messageLength);
    157       throw Status.INTERNAL.withDescription(err).asRuntimeException();
    158     }
    159     statsTraceCtx.outboundUncompressedSize(written);
    160     statsTraceCtx.outboundWireSize(currentMessageWireSize);
    161     statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
    162   }
    163 
    164   private int writeUncompressed(InputStream message, int messageLength) throws IOException {
    165     if (messageLength != -1) {
    166       currentMessageWireSize = messageLength;
    167       return writeKnownLengthUncompressed(message, messageLength);
    168     }
    169     BufferChainOutputStream bufferChain = new BufferChainOutputStream();
    170     int written = writeToOutputStream(message, bufferChain);
    171     if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
    172       throw Status.RESOURCE_EXHAUSTED
    173           .withDescription(
    174               String.format("message too large %d > %d", written , maxOutboundMessageSize))
    175           .asRuntimeException();
    176     }
    177     writeBufferChain(bufferChain, false);
    178     return written;
    179   }
    180 
    181   private int writeCompressed(InputStream message, int unusedMessageLength) throws IOException {
    182     BufferChainOutputStream bufferChain = new BufferChainOutputStream();
    183 
    184     OutputStream compressingStream = compressor.compress(bufferChain);
    185     int written;
    186     try {
    187       written = writeToOutputStream(message, compressingStream);
    188     } finally {
    189       compressingStream.close();
    190     }
    191     if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
    192       throw Status.RESOURCE_EXHAUSTED
    193           .withDescription(
    194               String.format("message too large %d > %d", written , maxOutboundMessageSize))
    195           .asRuntimeException();
    196     }
    197 
    198     writeBufferChain(bufferChain, true);
    199     return written;
    200   }
    201 
    202   private int getKnownLength(InputStream inputStream) throws IOException {
    203     if (inputStream instanceof KnownLength || inputStream instanceof ByteArrayInputStream) {
    204       return inputStream.available();
    205     }
    206     return -1;
    207   }
    208 
    209   /**
    210    * Write an unserialized message with a known length, uncompressed.
    211    */
    212   private int writeKnownLengthUncompressed(InputStream message, int messageLength)
    213       throws IOException {
    214     if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
    215       throw Status.RESOURCE_EXHAUSTED
    216           .withDescription(
    217               String.format("message too large %d > %d", messageLength , maxOutboundMessageSize))
    218           .asRuntimeException();
    219     }
    220     ByteBuffer header = ByteBuffer.wrap(headerScratch);
    221     header.put(UNCOMPRESSED);
    222     header.putInt(messageLength);
    223     // Allocate the initial buffer chunk based on frame header + payload length.
    224     // Note that the allocator may allocate a buffer larger or smaller than this length
    225     if (buffer == null) {
    226       buffer = bufferAllocator.allocate(header.position() + messageLength);
    227     }
    228     writeRaw(headerScratch, 0, header.position());
    229     return writeToOutputStream(message, outputStreamAdapter);
    230   }
    231 
    232   /**
    233    * Write a message that has been serialized to a sequence of buffers.
    234    */
    235   private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
    236     ByteBuffer header = ByteBuffer.wrap(headerScratch);
    237     header.put(compressed ? COMPRESSED : UNCOMPRESSED);
    238     int messageLength = bufferChain.readableBytes();
    239     header.putInt(messageLength);
    240     WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
    241     writeableHeader.write(headerScratch, 0, header.position());
    242     if (messageLength == 0) {
    243       // the payload had 0 length so make the header the current buffer.
    244       buffer = writeableHeader;
    245       return;
    246     }
    247     // Note that we are always delivering a small message to the transport here which
    248     // may incur transport framing overhead as it may be sent separately to the contents
    249     // of the GRPC frame.
    250     // The final message may not be completely written because we do not flush the last buffer.
    251     // Do not report the last message as sent.
    252     sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
    253     messagesBuffered = 1;
    254     // Commit all except the last buffer to the sink
    255     List<WritableBuffer> bufferList = bufferChain.bufferList;
    256     for (int i = 0; i < bufferList.size() - 1; i++) {
    257       sink.deliverFrame(bufferList.get(i), false, false, 0);
    258     }
    259     // Assign the current buffer to the last in the chain so it can be used
    260     // for future writes or written with end-of-stream=true on close.
    261     buffer = bufferList.get(bufferList.size() - 1);
    262     currentMessageWireSize = messageLength;
    263   }
    264 
    265   private static int writeToOutputStream(InputStream message, OutputStream outputStream)
    266       throws IOException {
    267     if (message instanceof Drainable) {
    268       return ((Drainable) message).drainTo(outputStream);
    269     } else {
    270       // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
    271       // expect performance-critical code to support flushTo().
    272       long written = IoUtils.copy(message, outputStream);
    273       checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written);
    274       return (int) written;
    275     }
    276   }
    277 
    278   private void writeRaw(byte[] b, int off, int len) {
    279     while (len > 0) {
    280       if (buffer != null && buffer.writableBytes() == 0) {
    281         commitToSink(false, false);
    282       }
    283       if (buffer == null) {
    284         // Request a buffer allocation using the message length as a hint.
    285         buffer = bufferAllocator.allocate(len);
    286       }
    287       int toWrite = min(len, buffer.writableBytes());
    288       buffer.write(b, off, toWrite);
    289       off += toWrite;
    290       len -= toWrite;
    291     }
    292   }
    293 
    294   /**
    295    * Flushes any buffered data in the framer to the sink.
    296    */
    297   @Override
    298   public void flush() {
    299     if (buffer != null && buffer.readableBytes() > 0) {
    300       commitToSink(false, true);
    301     }
    302   }
    303 
    304   /**
    305    * Indicates whether or not this framer has been closed via a call to either
    306    * {@link #close()} or {@link #dispose()}.
    307    */
    308   @Override
    309   public boolean isClosed() {
    310     return closed;
    311   }
    312 
    313   /**
    314    * Flushes and closes the framer and releases any buffers. After the framer is closed or
    315    * disposed, additional calls to this method will have no affect.
    316    */
    317   @Override
    318   public void close() {
    319     if (!isClosed()) {
    320       closed = true;
    321       // With the current code we don't expect readableBytes > 0 to be possible here, added
    322       // defensively to prevent buffer leak issues if the framer code changes later.
    323       if (buffer != null && buffer.readableBytes() == 0) {
    324         releaseBuffer();
    325       }
    326       commitToSink(true, true);
    327     }
    328   }
    329 
    330   /**
    331    * Closes the framer and releases any buffers, but does not flush. After the framer is
    332    * closed or disposed, additional calls to this method will have no affect.
    333    */
    334   @Override
    335   public void dispose() {
    336     closed = true;
    337     releaseBuffer();
    338   }
    339 
    340   private void releaseBuffer() {
    341     if (buffer != null) {
    342       buffer.release();
    343       buffer = null;
    344     }
    345   }
    346 
    347   private void commitToSink(boolean endOfStream, boolean flush) {
    348     WritableBuffer buf = buffer;
    349     buffer = null;
    350     sink.deliverFrame(buf, endOfStream, flush, messagesBuffered);
    351     messagesBuffered = 0;
    352   }
    353 
    354   private void verifyNotClosed() {
    355     if (isClosed()) {
    356       throw new IllegalStateException("Framer already closed");
    357     }
    358   }
    359 
    360   /** OutputStream whose write()s are passed to the framer. */
    361   private class OutputStreamAdapter extends OutputStream {
    362     /**
    363      * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
    364      * Better yet, you can use your own single byte buffer and call
    365      * {@link #write(byte[], int, int)}.
    366      */
    367     @Override
    368     public void write(int b) {
    369       byte[] singleByte = new byte[]{(byte)b};
    370       write(singleByte, 0, 1);
    371     }
    372 
    373     @Override
    374     public void write(byte[] b, int off, int len) {
    375       writeRaw(b, off, len);
    376     }
    377   }
    378 
    379   /**
    380    * Produce a collection of {@link WritableBuffer} instances from the data written to an
    381    * {@link OutputStream}.
    382    */
    383   private final class BufferChainOutputStream extends OutputStream {
    384     private final List<WritableBuffer> bufferList = new ArrayList<>();
    385     private WritableBuffer current;
    386 
    387     /**
    388      * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
    389      * Better yet, you can use your own single byte buffer and call
    390      * {@link #write(byte[], int, int)}.
    391      */
    392     @Override
    393     public void write(int b) throws IOException {
    394       if (current != null && current.writableBytes() > 0) {
    395         current.write((byte)b);
    396         return;
    397       }
    398       byte[] singleByte = new byte[]{(byte)b};
    399       write(singleByte, 0, 1);
    400     }
    401 
    402     @Override
    403     public void write(byte[] b, int off, int len) {
    404       if (current == null) {
    405         // Request len bytes initially from the allocator, it may give us more.
    406         current = bufferAllocator.allocate(len);
    407         bufferList.add(current);
    408       }
    409       while (len > 0) {
    410         int canWrite = Math.min(len, current.writableBytes());
    411         if (canWrite == 0) {
    412           // Assume message is twice as large as previous assumption if were still not done,
    413           // the allocator may allocate more or less than this amount.
    414           int needed = Math.max(len, current.readableBytes() * 2);
    415           current = bufferAllocator.allocate(needed);
    416           bufferList.add(current);
    417         } else {
    418           current.write(b, off, canWrite);
    419           off += canWrite;
    420           len -= canWrite;
    421         }
    422       }
    423     }
    424 
    425     private int readableBytes() {
    426       int readable = 0;
    427       for (WritableBuffer writableBuffer : bufferList) {
    428         readable += writableBuffer.readableBytes();
    429       }
    430       return readable;
    431     }
    432   }
    433 }
    434