Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.squareup.okhttp.internal.spdy;
     18 
     19 import java.io.EOFException;
     20 import java.io.IOException;
     21 import java.io.InterruptedIOException;
     22 import java.net.SocketTimeoutException;
     23 import java.util.ArrayList;
     24 import java.util.List;
     25 import okio.AsyncTimeout;
     26 import okio.Buffer;
     27 import okio.BufferedSource;
     28 import okio.Sink;
     29 import okio.Source;
     30 import okio.Timeout;
     31 
     32 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
     33 
     34 /** A logical bidirectional stream. */
     35 public final class SpdyStream {
     36   // Internal state is guarded by this. No long-running or potentially
     37   // blocking operations are performed while the lock is held.
     38 
     39   /**
     40    * The total number of bytes consumed by the application (with {@link
     41    * SpdyDataSource#read}), but not yet acknowledged by sending a {@code
     42    * WINDOW_UPDATE} frame on this stream.
     43    */
     44   // Visible for testing
     45   long unacknowledgedBytesRead = 0;
     46 
     47   /**
     48    * Count of bytes that can be written on the stream before receiving a
     49    * window update. Even if this is positive, writes will block until there
     50    * available bytes in {@code connection.bytesLeftInWriteWindow}.
     51    */
     52   // guarded by this
     53   long bytesLeftInWriteWindow;
     54 
     55   private final int id;
     56   private final SpdyConnection connection;
     57 
     58   /** Headers sent by the stream initiator. Immutable and non null. */
     59   private final List<Header> requestHeaders;
     60 
     61   /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
     62   private List<Header> responseHeaders;
     63 
     64   private final SpdyDataSource source;
     65   final SpdyDataSink sink;
     66   private final SpdyTimeout readTimeout = new SpdyTimeout();
     67   private final SpdyTimeout writeTimeout = new SpdyTimeout();
     68 
     69   /**
     70    * The reason why this stream was abnormally closed. If there are multiple
     71    * reasons to abnormally close this stream (such as both peers closing it
     72    * near-simultaneously) then this is the first reason known to this peer.
     73    */
     74   private ErrorCode errorCode = null;
     75 
     76   SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
     77       List<Header> requestHeaders) {
     78     if (connection == null) throw new NullPointerException("connection == null");
     79     if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
     80     this.id = id;
     81     this.connection = connection;
     82     this.bytesLeftInWriteWindow =
     83         connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
     84     this.source = new SpdyDataSource(
     85         connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE));
     86     this.sink = new SpdyDataSink();
     87     this.source.finished = inFinished;
     88     this.sink.finished = outFinished;
     89     this.requestHeaders = requestHeaders;
     90   }
     91 
     92   public int getId() {
     93     return id;
     94   }
     95 
     96   /**
     97    * Returns true if this stream is open. A stream is open until either:
     98    * <ul>
     99    * <li>A {@code SYN_RESET} frame abnormally terminates the stream.
    100    * <li>Both input and output streams have transmitted all data and
    101    * headers.
    102    * </ul>
    103    * Note that the input stream may continue to yield data even after a stream
    104    * reports itself as not open. This is because input data is buffered.
    105    */
    106   public synchronized boolean isOpen() {
    107     if (errorCode != null) {
    108       return false;
    109     }
    110     if ((source.finished || source.closed)
    111         && (sink.finished || sink.closed)
    112         && responseHeaders != null) {
    113       return false;
    114     }
    115     return true;
    116   }
    117 
    118   /** Returns true if this stream was created by this peer. */
    119   public boolean isLocallyInitiated() {
    120     boolean streamIsClient = ((id & 1) == 1);
    121     return connection.client == streamIsClient;
    122   }
    123 
    124   public SpdyConnection getConnection() {
    125     return connection;
    126   }
    127 
    128   public List<Header> getRequestHeaders() {
    129     return requestHeaders;
    130   }
    131 
    132   /**
    133    * Returns the stream's response headers, blocking if necessary if they
    134    * have not been received yet.
    135    */
    136   public synchronized List<Header> getResponseHeaders() throws IOException {
    137     readTimeout.enter();
    138     try {
    139       while (responseHeaders == null && errorCode == null) {
    140         waitForIo();
    141       }
    142     } finally {
    143       readTimeout.exitAndThrowIfTimedOut();
    144     }
    145     if (responseHeaders != null) return responseHeaders;
    146     throw new IOException("stream was reset: " + errorCode);
    147   }
    148 
    149   /**
    150    * Returns the reason why this stream was closed, or null if it closed
    151    * normally or has not yet been closed.
    152    */
    153   public synchronized ErrorCode getErrorCode() {
    154     return errorCode;
    155   }
    156 
    157   /**
    158    * Sends a reply to an incoming stream.
    159    *
    160    * @param out true to create an output stream that we can use to send data
    161    * to the remote peer. Corresponds to {@code FLAG_FIN}.
    162    */
    163   public void reply(List<Header> responseHeaders, boolean out) throws IOException {
    164     assert (!Thread.holdsLock(SpdyStream.this));
    165     boolean outFinished = false;
    166     synchronized (this) {
    167       if (responseHeaders == null) {
    168         throw new NullPointerException("responseHeaders == null");
    169       }
    170       if (this.responseHeaders != null) {
    171         throw new IllegalStateException("reply already sent");
    172       }
    173       this.responseHeaders = responseHeaders;
    174       if (!out) {
    175         this.sink.finished = true;
    176         outFinished = true;
    177       }
    178     }
    179     connection.writeSynReply(id, outFinished, responseHeaders);
    180 
    181     if (outFinished) {
    182       connection.flush();
    183     }
    184   }
    185 
    186   public Timeout readTimeout() {
    187     return readTimeout;
    188   }
    189 
    190   public Timeout writeTimeout() {
    191     return writeTimeout;
    192   }
    193 
    194   /** Returns a source that reads data from the peer. */
    195   public Source getSource() {
    196     return source;
    197   }
    198 
    199   /**
    200    * Returns a sink that can be used to write data to the peer.
    201    *
    202    * @throws IllegalStateException if this stream was initiated by the peer
    203    *     and a {@link #reply} has not yet been sent.
    204    */
    205   public Sink getSink() {
    206     synchronized (this) {
    207       if (responseHeaders == null && !isLocallyInitiated()) {
    208         throw new IllegalStateException("reply before requesting the sink");
    209       }
    210     }
    211     return sink;
    212   }
    213 
    214   /**
    215    * Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
    216    * frame has been transmitted.
    217    */
    218   public void close(ErrorCode rstStatusCode) throws IOException {
    219     if (!closeInternal(rstStatusCode)) {
    220       return; // Already closed.
    221     }
    222     connection.writeSynReset(id, rstStatusCode);
    223   }
    224 
    225   /**
    226    * Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
    227    * frame and returns immediately.
    228    */
    229   public void closeLater(ErrorCode errorCode) {
    230     if (!closeInternal(errorCode)) {
    231       return; // Already closed.
    232     }
    233     connection.writeSynResetLater(id, errorCode);
    234   }
    235 
    236   /** Returns true if this stream was closed. */
    237   private boolean closeInternal(ErrorCode errorCode) {
    238     assert (!Thread.holdsLock(this));
    239     synchronized (this) {
    240       if (this.errorCode != null) {
    241         return false;
    242       }
    243       if (source.finished && sink.finished) {
    244         return false;
    245       }
    246       this.errorCode = errorCode;
    247       notifyAll();
    248     }
    249     connection.removeStream(id);
    250     return true;
    251   }
    252 
    253   void receiveHeaders(List<Header> headers, HeadersMode headersMode) {
    254     assert (!Thread.holdsLock(SpdyStream.this));
    255     ErrorCode errorCode = null;
    256     boolean open = true;
    257     synchronized (this) {
    258       if (responseHeaders == null) {
    259         if (headersMode.failIfHeadersAbsent()) {
    260           errorCode = ErrorCode.PROTOCOL_ERROR;
    261         } else {
    262           responseHeaders = headers;
    263           open = isOpen();
    264           notifyAll();
    265         }
    266       } else {
    267         if (headersMode.failIfHeadersPresent()) {
    268           errorCode = ErrorCode.STREAM_IN_USE;
    269         } else {
    270           List<Header> newHeaders = new ArrayList<>();
    271           newHeaders.addAll(responseHeaders);
    272           newHeaders.addAll(headers);
    273           this.responseHeaders = newHeaders;
    274         }
    275       }
    276     }
    277     if (errorCode != null) {
    278       closeLater(errorCode);
    279     } else if (!open) {
    280       connection.removeStream(id);
    281     }
    282   }
    283 
    284   void receiveData(BufferedSource in, int length) throws IOException {
    285     assert (!Thread.holdsLock(SpdyStream.this));
    286     this.source.receive(in, length);
    287   }
    288 
    289   void receiveFin() {
    290     assert (!Thread.holdsLock(SpdyStream.this));
    291     boolean open;
    292     synchronized (this) {
    293       this.source.finished = true;
    294       open = isOpen();
    295       notifyAll();
    296     }
    297     if (!open) {
    298       connection.removeStream(id);
    299     }
    300   }
    301 
    302   synchronized void receiveRstStream(ErrorCode errorCode) {
    303     if (this.errorCode == null) {
    304       this.errorCode = errorCode;
    305       notifyAll();
    306     }
    307   }
    308 
    309   /**
    310    * A source that reads the incoming data frames of a stream. Although this
    311    * class uses synchronization to safely receive incoming data frames, it is
    312    * not intended for use by multiple readers.
    313    */
    314   private final class SpdyDataSource implements Source {
    315     /** Buffer to receive data from the network into. Only accessed by the reader thread. */
    316     private final Buffer receiveBuffer = new Buffer();
    317 
    318     /** Buffer with readable data. Guarded by SpdyStream.this. */
    319     private final Buffer readBuffer = new Buffer();
    320 
    321     /** Maximum number of bytes to buffer before reporting a flow control error. */
    322     private final long maxByteCount;
    323 
    324     /** True if the caller has closed this stream. */
    325     private boolean closed;
    326 
    327     /**
    328      * True if either side has cleanly shut down this stream. We will
    329      * receive no more bytes beyond those already in the buffer.
    330      */
    331     private boolean finished;
    332 
    333     private SpdyDataSource(long maxByteCount) {
    334       this.maxByteCount = maxByteCount;
    335     }
    336 
    337     @Override public long read(Buffer sink, long byteCount)
    338         throws IOException {
    339       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    340 
    341       long read;
    342       synchronized (SpdyStream.this) {
    343         waitUntilReadable();
    344         checkNotClosed();
    345         if (readBuffer.size() == 0) return -1; // This source is exhausted.
    346 
    347         // Move bytes from the read buffer into the caller's buffer.
    348         read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
    349 
    350         // Flow control: notify the peer that we're ready for more data!
    351         unacknowledgedBytesRead += read;
    352         if (unacknowledgedBytesRead
    353             >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
    354           connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
    355           unacknowledgedBytesRead = 0;
    356         }
    357       }
    358 
    359       // Update connection.unacknowledgedBytesRead outside the stream lock.
    360       synchronized (connection) { // Multiple application threads may hit this section.
    361         connection.unacknowledgedBytesRead += read;
    362         if (connection.unacknowledgedBytesRead
    363             >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
    364           connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
    365           connection.unacknowledgedBytesRead = 0;
    366         }
    367       }
    368 
    369       return read;
    370     }
    371 
    372     /** Returns once the source is either readable or finished. */
    373     private void waitUntilReadable() throws IOException {
    374       readTimeout.enter();
    375       try {
    376         while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
    377           waitForIo();
    378         }
    379       } finally {
    380         readTimeout.exitAndThrowIfTimedOut();
    381       }
    382     }
    383 
    384     void receive(BufferedSource in, long byteCount) throws IOException {
    385       assert (!Thread.holdsLock(SpdyStream.this));
    386 
    387       while (byteCount > 0) {
    388         boolean finished;
    389         boolean flowControlError;
    390         synchronized (SpdyStream.this) {
    391           finished = this.finished;
    392           flowControlError = byteCount + readBuffer.size() > maxByteCount;
    393         }
    394 
    395         // If the peer sends more data than we can handle, discard it and close the connection.
    396         if (flowControlError) {
    397           in.skip(byteCount);
    398           closeLater(ErrorCode.FLOW_CONTROL_ERROR);
    399           return;
    400         }
    401 
    402         // Discard data received after the stream is finished. It's probably a benign race.
    403         if (finished) {
    404           in.skip(byteCount);
    405           return;
    406         }
    407 
    408         // Fill the receive buffer without holding any locks.
    409         long read = in.read(receiveBuffer, byteCount);
    410         if (read == -1) throw new EOFException();
    411         byteCount -= read;
    412 
    413         // Move the received data to the read buffer to the reader can read it.
    414         synchronized (SpdyStream.this) {
    415           boolean wasEmpty = readBuffer.size() == 0;
    416           readBuffer.writeAll(receiveBuffer);
    417           if (wasEmpty) {
    418             SpdyStream.this.notifyAll();
    419           }
    420         }
    421       }
    422     }
    423 
    424     @Override public Timeout timeout() {
    425       return readTimeout;
    426     }
    427 
    428     @Override public void close() throws IOException {
    429       synchronized (SpdyStream.this) {
    430         closed = true;
    431         readBuffer.clear();
    432         SpdyStream.this.notifyAll();
    433       }
    434       cancelStreamIfNecessary();
    435     }
    436 
    437     private void checkNotClosed() throws IOException {
    438       if (closed) {
    439         throw new IOException("stream closed");
    440       }
    441       if (errorCode != null) {
    442         throw new IOException("stream was reset: " + errorCode);
    443       }
    444     }
    445   }
    446 
    447   private void cancelStreamIfNecessary() throws IOException {
    448     assert (!Thread.holdsLock(SpdyStream.this));
    449     boolean open;
    450     boolean cancel;
    451     synchronized (this) {
    452       cancel = !source.finished && source.closed && (sink.finished || sink.closed);
    453       open = isOpen();
    454     }
    455     if (cancel) {
    456       // RST this stream to prevent additional data from being sent. This
    457       // is safe because the input stream is closed (we won't use any
    458       // further bytes) and the output stream is either finished or closed
    459       // (so RSTing both streams doesn't cause harm).
    460       SpdyStream.this.close(ErrorCode.CANCEL);
    461     } else if (!open) {
    462       connection.removeStream(id);
    463     }
    464   }
    465 
    466   /**
    467    * A sink that writes outgoing data frames of a stream. This class is not
    468    * thread safe.
    469    */
    470   final class SpdyDataSink implements Sink {
    471     private static final long EMIT_BUFFER_SIZE = 16384;
    472 
    473     /**
    474      * Buffer of outgoing data. This batches writes of small writes into this sink as larges
    475      * frames written to the outgoing connection. Batching saves the (small) framing overhead.
    476      */
    477     private final Buffer sendBuffer = new Buffer();
    478 
    479     private boolean closed;
    480 
    481     /**
    482      * True if either side has cleanly shut down this stream. We shall send
    483      * no more bytes.
    484      */
    485     private boolean finished;
    486 
    487     @Override public void write(Buffer source, long byteCount) throws IOException {
    488       assert (!Thread.holdsLock(SpdyStream.this));
    489       sendBuffer.write(source, byteCount);
    490       while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {
    491         emitDataFrame(false);
    492       }
    493     }
    494 
    495     /**
    496      * Emit a single data frame to the connection. The frame's size be limited by this stream's
    497      * write window. This method will block until the write window is nonempty.
    498      */
    499     private void emitDataFrame(boolean outFinished) throws IOException {
    500       long toWrite;
    501       synchronized (SpdyStream.this) {
    502         writeTimeout.enter();
    503         try {
    504           while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
    505             waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
    506           }
    507         } finally {
    508           writeTimeout.exitAndThrowIfTimedOut();
    509         }
    510 
    511         checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
    512         toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
    513         bytesLeftInWriteWindow -= toWrite;
    514       }
    515 
    516       writeTimeout.enter();
    517       try {
    518         connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
    519       } finally {
    520         writeTimeout.exitAndThrowIfTimedOut();
    521       }
    522     }
    523 
    524     @Override public void flush() throws IOException {
    525       assert (!Thread.holdsLock(SpdyStream.this));
    526       synchronized (SpdyStream.this) {
    527         checkOutNotClosed();
    528       }
    529       while (sendBuffer.size() > 0) {
    530         emitDataFrame(false);
    531         connection.flush();
    532       }
    533     }
    534 
    535     @Override public Timeout timeout() {
    536       return writeTimeout;
    537     }
    538 
    539     @Override public void close() throws IOException {
    540       assert (!Thread.holdsLock(SpdyStream.this));
    541       synchronized (SpdyStream.this) {
    542         if (closed) return;
    543       }
    544       if (!sink.finished) {
    545         // Emit the remaining data, setting the END_STREAM flag on the last frame.
    546         if (sendBuffer.size() > 0) {
    547           while (sendBuffer.size() > 0) {
    548             emitDataFrame(true);
    549           }
    550         } else {
    551           // Send an empty frame just so we can set the END_STREAM flag.
    552           connection.writeData(id, true, null, 0);
    553         }
    554       }
    555       synchronized (SpdyStream.this) {
    556         closed = true;
    557       }
    558       connection.flush();
    559       cancelStreamIfNecessary();
    560     }
    561   }
    562 
    563   /**
    564    * {@code delta} will be negative if a settings frame initial window is
    565    * smaller than the last.
    566    */
    567   void addBytesToWriteWindow(long delta) {
    568     bytesLeftInWriteWindow += delta;
    569     if (delta > 0) SpdyStream.this.notifyAll();
    570   }
    571 
    572   private void checkOutNotClosed() throws IOException {
    573     if (sink.closed) {
    574       throw new IOException("stream closed");
    575     } else if (sink.finished) {
    576       throw new IOException("stream finished");
    577     } else if (errorCode != null) {
    578       throw new IOException("stream was reset: " + errorCode);
    579     }
    580   }
    581 
    582   /**
    583    * Like {@link #wait}, but throws an {@code InterruptedIOException} when
    584    * interrupted instead of the more awkward {@link InterruptedException}.
    585    */
    586   private void waitForIo() throws InterruptedIOException {
    587     try {
    588       wait();
    589     } catch (InterruptedException e) {
    590       throw new InterruptedIOException();
    591     }
    592   }
    593 
    594   /**
    595    * The Okio timeout watchdog will call {@link #timedOut} if the timeout is
    596    * reached. In that case we close the stream (asynchronously) which will
    597    * notify the waiting thread.
    598    */
    599   class SpdyTimeout extends AsyncTimeout {
    600     @Override protected void timedOut() {
    601       closeLater(ErrorCode.CANCEL);
    602     }
    603 
    604     @Override protected IOException newTimeoutException(IOException cause) {
    605       SocketTimeoutException socketTimeoutException = new SocketTimeoutException("timeout");
    606       if (cause != null) {
    607         socketTimeoutException.initCause(cause);
    608       }
    609       return socketTimeoutException;
    610     }
    611 
    612     public void exitAndThrowIfTimedOut() throws IOException {
    613       if (exit()) throw newTimeoutException(null /* cause */);
    614     }
    615   }
    616 }
    617