Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.squareup.okhttp.internal.spdy;
     18 
     19 import java.io.EOFException;
     20 import java.io.IOException;
     21 import java.io.InterruptedIOException;
     22 import java.net.SocketTimeoutException;
     23 import java.util.ArrayList;
     24 import java.util.List;
     25 import okio.BufferedSource;
     26 import okio.Deadline;
     27 import okio.OkBuffer;
     28 import okio.Sink;
     29 import okio.Source;
     30 
     31 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
     32 
     33 /** A logical bidirectional stream. */
     34 public final class SpdyStream {
     35   // Internal state is guarded by this. No long-running or potentially
     36   // blocking operations are performed while the lock is held.
     37 
     38   /**
     39    * The total number of bytes consumed by the application (with {@link
     40    * SpdyDataSource#read}), but not yet acknowledged by sending a {@code
     41    * WINDOW_UPDATE} frame on this stream.
     42    */
     43   // Visible for testing
     44   long unacknowledgedBytesRead = 0;
     45 
     46   /**
     47    * Count of bytes that can be written on the stream before receiving a
     48    * window update. Even if this is positive, writes will block until there
     49    * available bytes in {@code connection.bytesLeftInWriteWindow}.
     50    */
     51   // guarded by this
     52   long bytesLeftInWriteWindow;
     53 
     54   private final int id;
     55   private final SpdyConnection connection;
     56   private final int priority;
     57   private long readTimeoutMillis = 0;
     58 
     59   /** Headers sent by the stream initiator. Immutable and non null. */
     60   private final List<Header> requestHeaders;
     61 
     62   /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
     63   private List<Header> responseHeaders;
     64 
     65   private final SpdyDataSource source;
     66   final SpdyDataSink sink;
     67 
     68   /**
     69    * The reason why this stream was abnormally closed. If there are multiple
     70    * reasons to abnormally close this stream (such as both peers closing it
     71    * near-simultaneously) then this is the first reason known to this peer.
     72    */
     73   private ErrorCode errorCode = null;
     74 
     75   SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
     76       int priority, List<Header> requestHeaders) {
     77     if (connection == null) throw new NullPointerException("connection == null");
     78     if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
     79     this.id = id;
     80     this.connection = connection;
     81     this.bytesLeftInWriteWindow =
     82         connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
     83     this.source = new SpdyDataSource(
     84         connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE));
     85     this.sink = new SpdyDataSink();
     86     this.source.finished = inFinished;
     87     this.sink.finished = outFinished;
     88     this.priority = priority;
     89     this.requestHeaders = requestHeaders;
     90   }
     91 
     92   public int getId() {
     93     return id;
     94   }
     95 
     96   /**
     97    * Returns true if this stream is open. A stream is open until either:
     98    * <ul>
     99    * <li>A {@code SYN_RESET} frame abnormally terminates the stream.
    100    * <li>Both input and output streams have transmitted all data and
    101    * headers.
    102    * </ul>
    103    * Note that the input stream may continue to yield data even after a stream
    104    * reports itself as not open. This is because input data is buffered.
    105    */
    106   public synchronized boolean isOpen() {
    107     if (errorCode != null) {
    108       return false;
    109     }
    110     if ((source.finished || source.closed)
    111         && (sink.finished || sink.closed)
    112         && responseHeaders != null) {
    113       return false;
    114     }
    115     return true;
    116   }
    117 
    118   /** Returns true if this stream was created by this peer. */
    119   public boolean isLocallyInitiated() {
    120     boolean streamIsClient = ((id & 1) == 1);
    121     return connection.client == streamIsClient;
    122   }
    123 
    124   public SpdyConnection getConnection() {
    125     return connection;
    126   }
    127 
    128   public List<Header> getRequestHeaders() {
    129     return requestHeaders;
    130   }
    131 
    132   /**
    133    * Returns the stream's response headers, blocking if necessary if they
    134    * have not been received yet.
    135    */
    136   public synchronized List<Header> getResponseHeaders() throws IOException {
    137     long remaining = 0;
    138     long start = 0;
    139     if (readTimeoutMillis != 0) {
    140       start = (System.nanoTime() / 1000000);
    141       remaining = readTimeoutMillis;
    142     }
    143     try {
    144       while (responseHeaders == null && errorCode == null) {
    145         if (readTimeoutMillis == 0) { // No timeout configured.
    146           wait();
    147         } else if (remaining > 0) {
    148           wait(remaining);
    149           remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
    150         } else {
    151           throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: "
    152                             + readTimeoutMillis);
    153         }
    154       }
    155       if (responseHeaders != null) {
    156         return responseHeaders;
    157       }
    158       throw new IOException("stream was reset: " + errorCode);
    159     } catch (InterruptedException e) {
    160       InterruptedIOException rethrow = new InterruptedIOException();
    161       rethrow.initCause(e);
    162       throw rethrow;
    163     }
    164   }
    165 
    166   /**
    167    * Returns the reason why this stream was closed, or null if it closed
    168    * normally or has not yet been closed.
    169    */
    170   public synchronized ErrorCode getErrorCode() {
    171     return errorCode;
    172   }
    173 
    174   /**
    175    * Sends a reply to an incoming stream.
    176    *
    177    * @param out true to create an output stream that we can use to send data
    178    * to the remote peer. Corresponds to {@code FLAG_FIN}.
    179    */
    180   public void reply(List<Header> responseHeaders, boolean out) throws IOException {
    181     assert (!Thread.holdsLock(SpdyStream.this));
    182     boolean outFinished = false;
    183     synchronized (this) {
    184       if (responseHeaders == null) {
    185         throw new NullPointerException("responseHeaders == null");
    186       }
    187       if (this.responseHeaders != null) {
    188         throw new IllegalStateException("reply already sent");
    189       }
    190       this.responseHeaders = responseHeaders;
    191       if (!out) {
    192         this.sink.finished = true;
    193         outFinished = true;
    194       }
    195     }
    196     connection.writeSynReply(id, outFinished, responseHeaders);
    197 
    198     if (outFinished) {
    199       connection.flush();
    200     }
    201   }
    202 
    203   /**
    204    * Sets the maximum time to wait on input stream reads before failing with a
    205    * {@code SocketTimeoutException}, or {@code 0} to wait indefinitely.
    206    */
    207   public void setReadTimeout(long readTimeoutMillis) {
    208     this.readTimeoutMillis = readTimeoutMillis;
    209   }
    210 
    211   public long getReadTimeoutMillis() {
    212     return readTimeoutMillis;
    213   }
    214 
    215   /** Returns a source that reads data from the peer. */
    216   public Source getSource() {
    217     return source;
    218   }
    219 
    220   /**
    221    * Returns a sink that can be used to write data to the peer.
    222    *
    223    * @throws IllegalStateException if this stream was initiated by the peer
    224    *     and a {@link #reply} has not yet been sent.
    225    */
    226   public Sink getSink() {
    227     synchronized (this) {
    228       if (responseHeaders == null && !isLocallyInitiated()) {
    229         throw new IllegalStateException("reply before requesting the sink");
    230       }
    231     }
    232     return sink;
    233   }
    234 
    235   /**
    236    * Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
    237    * frame has been transmitted.
    238    */
    239   public void close(ErrorCode rstStatusCode) throws IOException {
    240     if (!closeInternal(rstStatusCode)) {
    241       return; // Already closed.
    242     }
    243     connection.writeSynReset(id, rstStatusCode);
    244   }
    245 
    246   /**
    247    * Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
    248    * frame and returns immediately.
    249    */
    250   public void closeLater(ErrorCode errorCode) {
    251     if (!closeInternal(errorCode)) {
    252       return; // Already closed.
    253     }
    254     connection.writeSynResetLater(id, errorCode);
    255   }
    256 
    257   /** Returns true if this stream was closed. */
    258   private boolean closeInternal(ErrorCode errorCode) {
    259     assert (!Thread.holdsLock(this));
    260     synchronized (this) {
    261       if (this.errorCode != null) {
    262         return false;
    263       }
    264       if (source.finished && sink.finished) {
    265         return false;
    266       }
    267       this.errorCode = errorCode;
    268       notifyAll();
    269     }
    270     connection.removeStream(id);
    271     return true;
    272   }
    273 
    274   void receiveHeaders(List<Header> headers, HeadersMode headersMode) {
    275     assert (!Thread.holdsLock(SpdyStream.this));
    276     ErrorCode errorCode = null;
    277     boolean open = true;
    278     synchronized (this) {
    279       if (responseHeaders == null) {
    280         if (headersMode.failIfHeadersAbsent()) {
    281           errorCode = ErrorCode.PROTOCOL_ERROR;
    282         } else {
    283           responseHeaders = headers;
    284           open = isOpen();
    285           notifyAll();
    286         }
    287       } else {
    288         if (headersMode.failIfHeadersPresent()) {
    289           errorCode = ErrorCode.STREAM_IN_USE;
    290         } else {
    291           List<Header> newHeaders = new ArrayList<Header>();
    292           newHeaders.addAll(responseHeaders);
    293           newHeaders.addAll(headers);
    294           this.responseHeaders = newHeaders;
    295         }
    296       }
    297     }
    298     if (errorCode != null) {
    299       closeLater(errorCode);
    300     } else if (!open) {
    301       connection.removeStream(id);
    302     }
    303   }
    304 
    305   void receiveData(BufferedSource in, int length) throws IOException {
    306     assert (!Thread.holdsLock(SpdyStream.this));
    307     this.source.receive(in, length);
    308   }
    309 
    310   void receiveFin() {
    311     assert (!Thread.holdsLock(SpdyStream.this));
    312     boolean open;
    313     synchronized (this) {
    314       this.source.finished = true;
    315       open = isOpen();
    316       notifyAll();
    317     }
    318     if (!open) {
    319       connection.removeStream(id);
    320     }
    321   }
    322 
    323   synchronized void receiveRstStream(ErrorCode errorCode) {
    324     if (this.errorCode == null) {
    325       this.errorCode = errorCode;
    326       notifyAll();
    327     }
    328   }
    329 
    330   int getPriority() {
    331     return priority;
    332   }
    333 
    334   /**
    335    * A source that reads the incoming data frames of a stream. Although this
    336    * class uses synchronization to safely receive incoming data frames, it is
    337    * not intended for use by multiple readers.
    338    */
    339   private final class SpdyDataSource implements Source {
    340     /** Buffer to receive data from the network into. Only accessed by the reader thread. */
    341     private final OkBuffer receiveBuffer = new OkBuffer();
    342 
    343     /** Buffer with readable data. Guarded by SpdyStream.this. */
    344     private final OkBuffer readBuffer = new OkBuffer();
    345 
    346     /** Maximum number of bytes to buffer before reporting a flow control error. */
    347     private final long maxByteCount;
    348 
    349     /** True if the caller has closed this stream. */
    350     private boolean closed;
    351 
    352     /**
    353      * True if either side has cleanly shut down this stream. We will
    354      * receive no more bytes beyond those already in the buffer.
    355      */
    356     private boolean finished;
    357 
    358     private SpdyDataSource(long maxByteCount) {
    359       this.maxByteCount = maxByteCount;
    360     }
    361 
    362     @Override public long read(OkBuffer sink, long byteCount)
    363         throws IOException {
    364       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    365 
    366       long read;
    367       synchronized (SpdyStream.this) {
    368         waitUntilReadable();
    369         checkNotClosed();
    370         if (readBuffer.size() == 0) return -1; // This source is exhausted.
    371 
    372         // Move bytes from the read buffer into the caller's buffer.
    373         read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
    374 
    375         // Flow control: notify the peer that we're ready for more data!
    376         unacknowledgedBytesRead += read;
    377         if (unacknowledgedBytesRead
    378             >= connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
    379           connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
    380           unacknowledgedBytesRead = 0;
    381         }
    382       }
    383 
    384       // Update connection.unacknowledgedBytesRead outside the stream lock.
    385       synchronized (connection) { // Multiple application threads may hit this section.
    386         connection.unacknowledgedBytesRead += read;
    387         if (connection.unacknowledgedBytesRead
    388             >= connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
    389           connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
    390           connection.unacknowledgedBytesRead = 0;
    391         }
    392       }
    393 
    394       return read;
    395     }
    396 
    397     /**
    398      * Returns once the input stream is either readable or finished. Throws
    399      * a {@link SocketTimeoutException} if the read timeout elapses before
    400      * that happens.
    401      */
    402     private void waitUntilReadable() throws IOException {
    403       long start = 0;
    404       long remaining = 0;
    405       if (readTimeoutMillis != 0) {
    406         start = (System.nanoTime() / 1000000);
    407         remaining = readTimeoutMillis;
    408       }
    409       try {
    410         while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
    411           if (readTimeoutMillis == 0) {
    412             SpdyStream.this.wait();
    413           } else if (remaining > 0) {
    414             SpdyStream.this.wait(remaining);
    415             remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
    416           } else {
    417             throw new SocketTimeoutException("Read timed out");
    418           }
    419         }
    420       } catch (InterruptedException e) {
    421         throw new InterruptedIOException();
    422       }
    423     }
    424 
    425     void receive(BufferedSource in, long byteCount) throws IOException {
    426       assert (!Thread.holdsLock(SpdyStream.this));
    427 
    428       while (byteCount > 0) {
    429         boolean finished;
    430         boolean flowControlError;
    431         synchronized (SpdyStream.this) {
    432           finished = this.finished;
    433           flowControlError = byteCount + readBuffer.size() > maxByteCount;
    434         }
    435 
    436         // If the peer sends more data than we can handle, discard it and close the connection.
    437         if (flowControlError) {
    438           in.skip(byteCount);
    439           closeLater(ErrorCode.FLOW_CONTROL_ERROR);
    440           return;
    441         }
    442 
    443         // Discard data received after the stream is finished. It's probably a benign race.
    444         if (finished) {
    445           in.skip(byteCount);
    446           return;
    447         }
    448 
    449         // Fill the receive buffer without holding any locks.
    450         long read = in.read(receiveBuffer, byteCount);
    451         if (read == -1) throw new EOFException();
    452         byteCount -= read;
    453 
    454         // Move the received data to the read buffer to the reader can read it.
    455         synchronized (SpdyStream.this) {
    456           boolean wasEmpty = readBuffer.size() == 0;
    457           readBuffer.write(receiveBuffer, receiveBuffer.size());
    458           if (wasEmpty) {
    459             SpdyStream.this.notifyAll();
    460           }
    461         }
    462       }
    463     }
    464 
    465     @Override public Source deadline(Deadline deadline) {
    466       // TODO: honor deadlines.
    467       return this;
    468     }
    469 
    470     @Override public void close() throws IOException {
    471       synchronized (SpdyStream.this) {
    472         closed = true;
    473         readBuffer.clear();
    474         SpdyStream.this.notifyAll();
    475       }
    476       cancelStreamIfNecessary();
    477     }
    478 
    479     private void checkNotClosed() throws IOException {
    480       if (closed) {
    481         throw new IOException("stream closed");
    482       }
    483       if (errorCode != null) {
    484         throw new IOException("stream was reset: " + errorCode);
    485       }
    486     }
    487   }
    488 
    489   private void cancelStreamIfNecessary() throws IOException {
    490     assert (!Thread.holdsLock(SpdyStream.this));
    491     boolean open;
    492     boolean cancel;
    493     synchronized (this) {
    494       cancel = !source.finished && source.closed && (sink.finished || sink.closed);
    495       open = isOpen();
    496     }
    497     if (cancel) {
    498       // RST this stream to prevent additional data from being sent. This
    499       // is safe because the input stream is closed (we won't use any
    500       // further bytes) and the output stream is either finished or closed
    501       // (so RSTing both streams doesn't cause harm).
    502       SpdyStream.this.close(ErrorCode.CANCEL);
    503     } else if (!open) {
    504       connection.removeStream(id);
    505     }
    506   }
    507 
    508   /**
    509    * A sink that writes outgoing data frames of a stream. This class is not
    510    * thread safe.
    511    */
    512   final class SpdyDataSink implements Sink {
    513     private boolean closed;
    514 
    515     /**
    516      * True if either side has cleanly shut down this stream. We shall send
    517      * no more bytes.
    518      */
    519     private boolean finished;
    520 
    521     @Override public void write(OkBuffer source, long byteCount) throws IOException {
    522       assert (!Thread.holdsLock(SpdyStream.this));
    523       while (byteCount > 0) {
    524         long toWrite;
    525         synchronized (SpdyStream.this) {
    526           try {
    527             while (bytesLeftInWriteWindow <= 0) {
    528               SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE.
    529             }
    530           } catch (InterruptedException e) {
    531             throw new InterruptedIOException();
    532           }
    533 
    534           checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
    535           toWrite = Math.min(bytesLeftInWriteWindow, byteCount);
    536           bytesLeftInWriteWindow -= toWrite;
    537         }
    538 
    539         byteCount -= toWrite;
    540         connection.writeData(id, false, source, toWrite);
    541       }
    542     }
    543 
    544     @Override public void flush() throws IOException {
    545       assert (!Thread.holdsLock(SpdyStream.this));
    546       synchronized (SpdyStream.this) {
    547         checkOutNotClosed();
    548       }
    549       connection.flush();
    550     }
    551 
    552     @Override public Sink deadline(Deadline deadline) {
    553       // TODO: honor deadlines.
    554       return this;
    555     }
    556 
    557     @Override public void close() throws IOException {
    558       assert (!Thread.holdsLock(SpdyStream.this));
    559       synchronized (SpdyStream.this) {
    560         if (closed) return;
    561       }
    562       if (!sink.finished) {
    563         connection.writeData(id, true, null, 0);
    564       }
    565       synchronized (SpdyStream.this) {
    566         closed = true;
    567       }
    568       connection.flush();
    569       cancelStreamIfNecessary();
    570     }
    571   }
    572 
    573   /**
    574    * {@code delta} will be negative if a settings frame initial window is
    575    * smaller than the last.
    576    */
    577   void addBytesToWriteWindow(long delta) {
    578     bytesLeftInWriteWindow += delta;
    579     if (delta > 0) SpdyStream.this.notifyAll();
    580   }
    581 
    582   private void checkOutNotClosed() throws IOException {
    583     if (sink.closed) {
    584       throw new IOException("stream closed");
    585     } else if (sink.finished) {
    586       throw new IOException("stream finished");
    587     } else if (errorCode != null) {
    588       throw new IOException("stream was reset: " + errorCode);
    589     }
    590   }
    591 }
    592