Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.squareup.okhttp.internal.spdy;
     18 
     19 import com.squareup.okhttp.internal.Util;
     20 import java.io.IOException;
     21 import java.io.InputStream;
     22 import java.io.InterruptedIOException;
     23 import java.io.OutputStream;
     24 import java.net.SocketTimeoutException;
     25 import java.util.ArrayList;
     26 import java.util.List;
     27 
     28 import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
     29 import static com.squareup.okhttp.internal.Util.pokeInt;
     30 import static java.nio.ByteOrder.BIG_ENDIAN;
     31 
     32 /** A logical bidirectional stream. */
     33 public final class SpdyStream {
     34 
     35   // Internal state is guarded by this. No long-running or potentially
     36   // blocking operations are performed while the lock is held.
     37 
     38   private static final int DATA_FRAME_HEADER_LENGTH = 8;
     39 
     40   private static final String[] STATUS_CODE_NAMES = {
     41       null,
     42       "PROTOCOL_ERROR",
     43       "INVALID_STREAM",
     44       "REFUSED_STREAM",
     45       "UNSUPPORTED_VERSION",
     46       "CANCEL",
     47       "INTERNAL_ERROR",
     48       "FLOW_CONTROL_ERROR",
     49       "STREAM_IN_USE",
     50       "STREAM_ALREADY_CLOSED",
     51       "INVALID_CREDENTIALS",
     52       "FRAME_TOO_LARGE"
     53   };
     54 
     55   public static final int RST_PROTOCOL_ERROR = 1;
     56   public static final int RST_INVALID_STREAM = 2;
     57   public static final int RST_REFUSED_STREAM = 3;
     58   public static final int RST_UNSUPPORTED_VERSION = 4;
     59   public static final int RST_CANCEL = 5;
     60   public static final int RST_INTERNAL_ERROR = 6;
     61   public static final int RST_FLOW_CONTROL_ERROR = 7;
     62   public static final int RST_STREAM_IN_USE = 8;
     63   public static final int RST_STREAM_ALREADY_CLOSED = 9;
     64   public static final int RST_INVALID_CREDENTIALS = 10;
     65   public static final int RST_FRAME_TOO_LARGE = 11;
     66 
     67   /**
     68    * The number of unacknowledged bytes at which the input stream will send
     69    * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's
     70    * window size, otherwise the remote peer will stop sending data on this
     71    * stream. (Chrome 25 uses 5 MiB.)
     72    */
     73   public static final int WINDOW_UPDATE_THRESHOLD = Settings.DEFAULT_INITIAL_WINDOW_SIZE / 2;
     74 
     75   private final int id;
     76   private final SpdyConnection connection;
     77   private final int priority;
     78   private final int slot;
     79   private long readTimeoutMillis = 0;
     80   private int writeWindowSize;
     81 
     82   /** Headers sent by the stream initiator. Immutable and non null. */
     83   private final List<String> requestHeaders;
     84 
     85   /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
     86   private List<String> responseHeaders;
     87 
     88   private final SpdyDataInputStream in = new SpdyDataInputStream();
     89   private final SpdyDataOutputStream out = new SpdyDataOutputStream();
     90 
     91   /**
     92    * The reason why this stream was abnormally closed. If there are multiple
     93    * reasons to abnormally close this stream (such as both peers closing it
     94    * near-simultaneously) then this is the first reason known to this peer.
     95    */
     96   private int rstStatusCode = -1;
     97 
     98   SpdyStream(int id, SpdyConnection connection, int flags, int priority, int slot,
     99       List<String> requestHeaders, Settings settings) {
    100     if (connection == null) throw new NullPointerException("connection == null");
    101     if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
    102     this.id = id;
    103     this.connection = connection;
    104     this.priority = priority;
    105     this.slot = slot;
    106     this.requestHeaders = requestHeaders;
    107 
    108     if (isLocallyInitiated()) {
    109       // I am the sender
    110       in.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0;
    111       out.finished = (flags & SpdyConnection.FLAG_FIN) != 0;
    112     } else {
    113       // I am the receiver
    114       in.finished = (flags & SpdyConnection.FLAG_FIN) != 0;
    115       out.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0;
    116     }
    117 
    118     setSettings(settings);
    119   }
    120 
    121   /**
    122    * Returns true if this stream is open. A stream is open until either:
    123    * <ul>
    124    * <li>A {@code SYN_RESET} frame abnormally terminates the stream.
    125    * <li>Both input and output streams have transmitted all data and
    126    * headers.
    127    * </ul>
    128    * Note that the input stream may continue to yield data even after a stream
    129    * reports itself as not open. This is because input data is buffered.
    130    */
    131   public synchronized boolean isOpen() {
    132     if (rstStatusCode != -1) {
    133       return false;
    134     }
    135     if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) {
    136       return false;
    137     }
    138     return true;
    139   }
    140 
    141   /** Returns true if this stream was created by this peer. */
    142   public boolean isLocallyInitiated() {
    143     boolean streamIsClient = (id % 2 == 1);
    144     return connection.client == streamIsClient;
    145   }
    146 
    147   public SpdyConnection getConnection() {
    148     return connection;
    149   }
    150 
    151   public List<String> getRequestHeaders() {
    152     return requestHeaders;
    153   }
    154 
    155   /**
    156    * Returns the stream's response headers, blocking if necessary if they
    157    * have not been received yet.
    158    */
    159   public synchronized List<String> getResponseHeaders() throws IOException {
    160     try {
    161       while (responseHeaders == null && rstStatusCode == -1) {
    162         wait();
    163       }
    164       if (responseHeaders != null) {
    165         return responseHeaders;
    166       }
    167       throw new IOException("stream was reset: " + rstStatusString());
    168     } catch (InterruptedException e) {
    169       InterruptedIOException rethrow = new InterruptedIOException();
    170       rethrow.initCause(e);
    171       throw rethrow;
    172     }
    173   }
    174 
    175   /**
    176    * Returns the reason why this stream was closed, or -1 if it closed
    177    * normally or has not yet been closed. Valid reasons are {@link
    178    * #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link
    179    * #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link
    180    * #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link
    181    * #RST_FLOW_CONTROL_ERROR}.
    182    */
    183   public synchronized int getRstStatusCode() {
    184     return rstStatusCode;
    185   }
    186 
    187   /**
    188    * Sends a reply to an incoming stream.
    189    *
    190    * @param out true to create an output stream that we can use to send data
    191    * to the remote peer. Corresponds to {@code FLAG_FIN}.
    192    */
    193   public void reply(List<String> responseHeaders, boolean out) throws IOException {
    194     assert (!Thread.holdsLock(SpdyStream.this));
    195     int flags = 0;
    196     synchronized (this) {
    197       if (responseHeaders == null) {
    198         throw new NullPointerException("responseHeaders == null");
    199       }
    200       if (isLocallyInitiated()) {
    201         throw new IllegalStateException("cannot reply to a locally initiated stream");
    202       }
    203       if (this.responseHeaders != null) {
    204         throw new IllegalStateException("reply already sent");
    205       }
    206       this.responseHeaders = responseHeaders;
    207       if (!out) {
    208         this.out.finished = true;
    209         flags |= SpdyConnection.FLAG_FIN;
    210       }
    211     }
    212     connection.writeSynReply(id, flags, responseHeaders);
    213   }
    214 
    215   /**
    216    * Sets the maximum time to wait on input stream reads before failing with a
    217    * {@code SocketTimeoutException}, or {@code 0} to wait indefinitely.
    218    */
    219   public void setReadTimeout(long readTimeoutMillis) {
    220     this.readTimeoutMillis = readTimeoutMillis;
    221   }
    222 
    223   public long getReadTimeoutMillis() {
    224     return readTimeoutMillis;
    225   }
    226 
    227   /** Returns an input stream that can be used to read data from the peer. */
    228   public InputStream getInputStream() {
    229     return in;
    230   }
    231 
    232   /**
    233    * Returns an output stream that can be used to write data to the peer.
    234    *
    235    * @throws IllegalStateException if this stream was initiated by the peer
    236    * and a {@link #reply} has not yet been sent.
    237    */
    238   public OutputStream getOutputStream() {
    239     synchronized (this) {
    240       if (responseHeaders == null && !isLocallyInitiated()) {
    241         throw new IllegalStateException("reply before requesting the output stream");
    242       }
    243     }
    244     return out;
    245   }
    246 
    247   /**
    248    * Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
    249    * frame has been transmitted.
    250    */
    251   public void close(int rstStatusCode) throws IOException {
    252     if (!closeInternal(rstStatusCode)) {
    253       return; // Already closed.
    254     }
    255     connection.writeSynReset(id, rstStatusCode);
    256   }
    257 
    258   /**
    259    * Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
    260    * frame and returns immediately.
    261    */
    262   public void closeLater(int rstStatusCode) {
    263     if (!closeInternal(rstStatusCode)) {
    264       return; // Already closed.
    265     }
    266     connection.writeSynResetLater(id, rstStatusCode);
    267   }
    268 
    269   /** Returns true if this stream was closed. */
    270   private boolean closeInternal(int rstStatusCode) {
    271     assert (!Thread.holdsLock(this));
    272     synchronized (this) {
    273       if (this.rstStatusCode != -1) {
    274         return false;
    275       }
    276       if (in.finished && out.finished) {
    277         return false;
    278       }
    279       this.rstStatusCode = rstStatusCode;
    280       notifyAll();
    281     }
    282     connection.removeStream(id);
    283     return true;
    284   }
    285 
    286   void receiveReply(List<String> strings) throws IOException {
    287     assert (!Thread.holdsLock(SpdyStream.this));
    288     boolean streamInUseError = false;
    289     boolean open = true;
    290     synchronized (this) {
    291       if (isLocallyInitiated() && responseHeaders == null) {
    292         responseHeaders = strings;
    293         open = isOpen();
    294         notifyAll();
    295       } else {
    296         streamInUseError = true;
    297       }
    298     }
    299     if (streamInUseError) {
    300       closeLater(SpdyStream.RST_STREAM_IN_USE);
    301     } else if (!open) {
    302       connection.removeStream(id);
    303     }
    304   }
    305 
    306   void receiveHeaders(List<String> headers) throws IOException {
    307     assert (!Thread.holdsLock(SpdyStream.this));
    308     boolean protocolError = false;
    309     synchronized (this) {
    310       if (responseHeaders != null) {
    311         List<String> newHeaders = new ArrayList<String>();
    312         newHeaders.addAll(responseHeaders);
    313         newHeaders.addAll(headers);
    314         this.responseHeaders = newHeaders;
    315       } else {
    316         protocolError = true;
    317       }
    318     }
    319     if (protocolError) {
    320       closeLater(SpdyStream.RST_PROTOCOL_ERROR);
    321     }
    322   }
    323 
    324   void receiveData(InputStream in, int length) throws IOException {
    325     assert (!Thread.holdsLock(SpdyStream.this));
    326     this.in.receive(in, length);
    327   }
    328 
    329   void receiveFin() {
    330     assert (!Thread.holdsLock(SpdyStream.this));
    331     boolean open;
    332     synchronized (this) {
    333       this.in.finished = true;
    334       open = isOpen();
    335       notifyAll();
    336     }
    337     if (!open) {
    338       connection.removeStream(id);
    339     }
    340   }
    341 
    342   synchronized void receiveRstStream(int statusCode) {
    343     if (rstStatusCode == -1) {
    344       rstStatusCode = statusCode;
    345       notifyAll();
    346     }
    347   }
    348 
    349   private void setSettings(Settings settings) {
    350     assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'.
    351     this.writeWindowSize =
    352         settings != null ? settings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE)
    353             : Settings.DEFAULT_INITIAL_WINDOW_SIZE;
    354   }
    355 
    356   void receiveSettings(Settings settings) {
    357     assert (Thread.holdsLock(this));
    358     setSettings(settings);
    359     notifyAll();
    360   }
    361 
    362   synchronized void receiveWindowUpdate(int deltaWindowSize) {
    363     out.unacknowledgedBytes -= deltaWindowSize;
    364     notifyAll();
    365   }
    366 
    367   private String rstStatusString() {
    368     return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length
    369         ? STATUS_CODE_NAMES[rstStatusCode] : Integer.toString(rstStatusCode);
    370   }
    371 
    372   int getPriority() {
    373     return priority;
    374   }
    375 
    376   int getSlot() {
    377     return slot;
    378   }
    379 
    380   /**
    381    * An input stream that reads the incoming data frames of a stream. Although
    382    * this class uses synchronization to safely receive incoming data frames,
    383    * it is not intended for use by multiple readers.
    384    */
    385   private final class SpdyDataInputStream extends InputStream {
    386     // Store incoming data bytes in a circular buffer. When the buffer is
    387     // empty, pos == -1. Otherwise pos is the first byte to read and limit
    388     // is the first byte to write.
    389     //
    390     // { - - - X X X X - - - }
    391     //         ^       ^
    392     //        pos    limit
    393     //
    394     // { X X X - - - - X X X }
    395     //         ^       ^
    396     //       limit    pos
    397 
    398     private final byte[] buffer = new byte[Settings.DEFAULT_INITIAL_WINDOW_SIZE];
    399 
    400     /** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */
    401     private int pos = -1;
    402 
    403     /** the last byte to be read. Never buffer.length */
    404     private int limit;
    405 
    406     /** True if the caller has closed this stream. */
    407     private boolean closed;
    408 
    409     /**
    410      * True if either side has cleanly shut down this stream. We will
    411      * receive no more bytes beyond those already in the buffer.
    412      */
    413     private boolean finished;
    414 
    415     /**
    416      * The total number of bytes consumed by the application (with {@link
    417      * #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE}
    418      * frame.
    419      */
    420     private int unacknowledgedBytes = 0;
    421 
    422     @Override public int available() throws IOException {
    423       synchronized (SpdyStream.this) {
    424         checkNotClosed();
    425         if (pos == -1) {
    426           return 0;
    427         } else if (limit > pos) {
    428           return limit - pos;
    429         } else {
    430           return limit + (buffer.length - pos);
    431         }
    432       }
    433     }
    434 
    435     @Override public int read() throws IOException {
    436       return Util.readSingleByte(this);
    437     }
    438 
    439     @Override public int read(byte[] b, int offset, int count) throws IOException {
    440       synchronized (SpdyStream.this) {
    441         checkOffsetAndCount(b.length, offset, count);
    442         waitUntilReadable();
    443         checkNotClosed();
    444 
    445         if (pos == -1) {
    446           return -1;
    447         }
    448 
    449         int copied = 0;
    450 
    451         // drain from [pos..buffer.length)
    452         if (limit <= pos) {
    453           int bytesToCopy = Math.min(count, buffer.length - pos);
    454           System.arraycopy(buffer, pos, b, offset, bytesToCopy);
    455           pos += bytesToCopy;
    456           copied += bytesToCopy;
    457           if (pos == buffer.length) {
    458             pos = 0;
    459           }
    460         }
    461 
    462         // drain from [pos..limit)
    463         if (copied < count) {
    464           int bytesToCopy = Math.min(limit - pos, count - copied);
    465           System.arraycopy(buffer, pos, b, offset + copied, bytesToCopy);
    466           pos += bytesToCopy;
    467           copied += bytesToCopy;
    468         }
    469 
    470         // Flow control: notify the peer that we're ready for more data!
    471         unacknowledgedBytes += copied;
    472         if (unacknowledgedBytes >= WINDOW_UPDATE_THRESHOLD) {
    473           connection.writeWindowUpdateLater(id, unacknowledgedBytes);
    474           unacknowledgedBytes = 0;
    475         }
    476 
    477         if (pos == limit) {
    478           pos = -1;
    479           limit = 0;
    480         }
    481 
    482         return copied;
    483       }
    484     }
    485 
    486     /**
    487      * Returns once the input stream is either readable or finished. Throws
    488      * a {@link SocketTimeoutException} if the read timeout elapses before
    489      * that happens.
    490      */
    491     private void waitUntilReadable() throws IOException {
    492       long start = 0;
    493       long remaining = 0;
    494       if (readTimeoutMillis != 0) {
    495         start = (System.nanoTime() / 1000000);
    496         remaining = readTimeoutMillis;
    497       }
    498       try {
    499         while (pos == -1 && !finished && !closed && rstStatusCode == -1) {
    500           if (readTimeoutMillis == 0) {
    501             SpdyStream.this.wait();
    502           } else if (remaining > 0) {
    503             SpdyStream.this.wait(remaining);
    504             remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
    505           } else {
    506             throw new SocketTimeoutException();
    507           }
    508         }
    509       } catch (InterruptedException e) {
    510         throw new InterruptedIOException();
    511       }
    512     }
    513 
    514     void receive(InputStream in, int byteCount) throws IOException {
    515       assert (!Thread.holdsLock(SpdyStream.this));
    516 
    517       if (byteCount == 0) {
    518         return;
    519       }
    520 
    521       int pos;
    522       int limit;
    523       int firstNewByte;
    524       boolean finished;
    525       boolean flowControlError;
    526       synchronized (SpdyStream.this) {
    527         finished = this.finished;
    528         pos = this.pos;
    529         firstNewByte = this.limit;
    530         limit = this.limit;
    531         flowControlError = byteCount > buffer.length - available();
    532       }
    533 
    534       // If the peer sends more data than we can handle, discard it and close the connection.
    535       if (flowControlError) {
    536         Util.skipByReading(in, byteCount);
    537         closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
    538         return;
    539       }
    540 
    541       // Discard data received after the stream is finished. It's probably a benign race.
    542       if (finished) {
    543         Util.skipByReading(in, byteCount);
    544         return;
    545       }
    546 
    547       // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that
    548       // won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise
    549       // writes will be blocked until reads complete.
    550       if (pos < limit) {
    551         int firstCopyCount = Math.min(byteCount, buffer.length - limit);
    552         Util.readFully(in, buffer, limit, firstCopyCount);
    553         limit += firstCopyCount;
    554         byteCount -= firstCopyCount;
    555         if (limit == buffer.length) {
    556           limit = 0;
    557         }
    558       }
    559       if (byteCount > 0) {
    560         Util.readFully(in, buffer, limit, byteCount);
    561         limit += byteCount;
    562       }
    563 
    564       synchronized (SpdyStream.this) {
    565         // Update the new limit, and mark the position as readable if necessary.
    566         this.limit = limit;
    567         if (this.pos == -1) {
    568           this.pos = firstNewByte;
    569           SpdyStream.this.notifyAll();
    570         }
    571       }
    572     }
    573 
    574     @Override public void close() throws IOException {
    575       synchronized (SpdyStream.this) {
    576         closed = true;
    577         SpdyStream.this.notifyAll();
    578       }
    579       cancelStreamIfNecessary();
    580     }
    581 
    582     private void checkNotClosed() throws IOException {
    583       if (closed) {
    584         throw new IOException("stream closed");
    585       }
    586       if (rstStatusCode != -1) {
    587         throw new IOException("stream was reset: " + rstStatusString());
    588       }
    589     }
    590   }
    591 
    592   private void cancelStreamIfNecessary() throws IOException {
    593     assert (!Thread.holdsLock(SpdyStream.this));
    594     boolean open;
    595     boolean cancel;
    596     synchronized (this) {
    597       cancel = !in.finished && in.closed && (out.finished || out.closed);
    598       open = isOpen();
    599     }
    600     if (cancel) {
    601       // RST this stream to prevent additional data from being sent. This
    602       // is safe because the input stream is closed (we won't use any
    603       // further bytes) and the output stream is either finished or closed
    604       // (so RSTing both streams doesn't cause harm).
    605       SpdyStream.this.close(RST_CANCEL);
    606     } else if (!open) {
    607       connection.removeStream(id);
    608     }
    609   }
    610 
    611   /**
    612    * An output stream that writes outgoing data frames of a stream. This class
    613    * is not thread safe.
    614    */
    615   private final class SpdyDataOutputStream extends OutputStream {
    616     private final byte[] buffer = new byte[8192];
    617     private int pos = DATA_FRAME_HEADER_LENGTH;
    618 
    619     /** True if the caller has closed this stream. */
    620     private boolean closed;
    621 
    622     /**
    623      * True if either side has cleanly shut down this stream. We shall send
    624      * no more bytes.
    625      */
    626     private boolean finished;
    627 
    628     /**
    629      * The total number of bytes written out to the peer, but not yet
    630      * acknowledged with an incoming {@code WINDOW_UPDATE} frame. Writes
    631      * block if they cause this to exceed the {@code WINDOW_SIZE}.
    632      */
    633     private int unacknowledgedBytes = 0;
    634 
    635     @Override public void write(int b) throws IOException {
    636       Util.writeSingleByte(this, b);
    637     }
    638 
    639     @Override public void write(byte[] bytes, int offset, int count) throws IOException {
    640       assert (!Thread.holdsLock(SpdyStream.this));
    641       checkOffsetAndCount(bytes.length, offset, count);
    642       checkNotClosed();
    643 
    644       while (count > 0) {
    645         if (pos == buffer.length) {
    646           writeFrame(false);
    647         }
    648         int bytesToCopy = Math.min(count, buffer.length - pos);
    649         System.arraycopy(bytes, offset, buffer, pos, bytesToCopy);
    650         pos += bytesToCopy;
    651         offset += bytesToCopy;
    652         count -= bytesToCopy;
    653       }
    654     }
    655 
    656     @Override public void flush() throws IOException {
    657       assert (!Thread.holdsLock(SpdyStream.this));
    658       checkNotClosed();
    659       if (pos > DATA_FRAME_HEADER_LENGTH) {
    660         writeFrame(false);
    661         connection.flush();
    662       }
    663     }
    664 
    665     @Override public void close() throws IOException {
    666       assert (!Thread.holdsLock(SpdyStream.this));
    667       synchronized (SpdyStream.this) {
    668         if (closed) {
    669           return;
    670         }
    671         closed = true;
    672       }
    673       if (!out.finished) {
    674         writeFrame(true);
    675       }
    676       connection.flush();
    677       cancelStreamIfNecessary();
    678     }
    679 
    680     private void writeFrame(boolean last) throws IOException {
    681       assert (!Thread.holdsLock(SpdyStream.this));
    682 
    683       int length = pos - DATA_FRAME_HEADER_LENGTH;
    684       synchronized (SpdyStream.this) {
    685         waitUntilWritable(length, last);
    686         unacknowledgedBytes += length;
    687       }
    688       int flags = 0;
    689       if (last) {
    690         flags |= SpdyConnection.FLAG_FIN;
    691       }
    692       pokeInt(buffer, 0, id & 0x7fffffff, BIG_ENDIAN);
    693       pokeInt(buffer, 4, (flags & 0xff) << 24 | length & 0xffffff, BIG_ENDIAN);
    694       connection.writeFrame(buffer, 0, pos);
    695       pos = DATA_FRAME_HEADER_LENGTH;
    696     }
    697 
    698     /**
    699      * Returns once the peer is ready to receive {@code count} bytes.
    700      *
    701      * @throws IOException if the stream was finished or closed, or the
    702      * thread was interrupted.
    703      */
    704     private void waitUntilWritable(int count, boolean last) throws IOException {
    705       try {
    706         while (unacknowledgedBytes + count >= writeWindowSize) {
    707           SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE.
    708 
    709           // The stream may have been closed or reset while we were waiting!
    710           if (!last && closed) {
    711             throw new IOException("stream closed");
    712           } else if (finished) {
    713             throw new IOException("stream finished");
    714           } else if (rstStatusCode != -1) {
    715             throw new IOException("stream was reset: " + rstStatusString());
    716           }
    717         }
    718       } catch (InterruptedException e) {
    719         throw new InterruptedIOException();
    720       }
    721     }
    722 
    723     private void checkNotClosed() throws IOException {
    724       synchronized (SpdyStream.this) {
    725         if (closed) {
    726           throw new IOException("stream closed");
    727         } else if (finished) {
    728           throw new IOException("stream finished");
    729         } else if (rstStatusCode != -1) {
    730           throw new IOException("stream was reset: " + rstStatusString());
    731         }
    732       }
    733     }
    734   }
    735 }
    736