Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp.internal.spdy;
     17 
     18 import com.squareup.okhttp.Protocol;
     19 import com.squareup.okhttp.internal.NamedRunnable;
     20 import com.squareup.okhttp.internal.Util;
     21 import java.io.Closeable;
     22 import java.io.IOException;
     23 import java.io.InterruptedIOException;
     24 import java.net.InetSocketAddress;
     25 import java.net.Socket;
     26 import java.util.HashMap;
     27 import java.util.Iterator;
     28 import java.util.LinkedHashSet;
     29 import java.util.List;
     30 import java.util.Map;
     31 import java.util.Set;
     32 import java.util.concurrent.ExecutorService;
     33 import java.util.concurrent.SynchronousQueue;
     34 import java.util.concurrent.ThreadPoolExecutor;
     35 import java.util.concurrent.TimeUnit;
     36 import okio.BufferedSink;
     37 import okio.BufferedSource;
     38 import okio.ByteString;
     39 import okio.OkBuffer;
     40 import okio.Okio;
     41 
     42 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
     43 
     44 /**
     45  * A socket connection to a remote peer. A connection hosts streams which can
     46  * send and receive data.
     47  *
     48  * <p>Many methods in this API are <strong>synchronous:</strong> the call is
     49  * completed before the method returns. This is typical for Java but atypical
     50  * for SPDY. This is motivated by exception transparency: an IOException that
     51  * was triggered by a certain caller can be caught and handled by that caller.
     52  */
     53 public final class SpdyConnection implements Closeable {
     54 
     55   // Internal state of this connection is guarded by 'this'. No blocking
     56   // operations may be performed while holding this lock!
     57   //
     58   // Socket writes are guarded by frameWriter.
     59   //
     60   // Socket reads are unguarded but are only made by the reader thread.
     61   //
     62   // Certain operations (like SYN_STREAM) need to synchronize on both the
     63   // frameWriter (to do blocking I/O) and this (to create streams). Such
     64   // operations must synchronize on 'this' last. This ensures that we never
     65   // wait for a blocking operation while holding 'this'.
     66 
     67   private static final ExecutorService executor = new ThreadPoolExecutor(0,
     68       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
     69       Util.threadFactory("OkHttp SpdyConnection", true));
     70 
     71   /** The protocol variant, like {@link com.squareup.okhttp.internal.spdy.Spdy3}. */
     72   final Protocol protocol;
     73 
     74   /** True if this peer initiated the connection. */
     75   final boolean client;
     76 
     77   /**
     78    * User code to run in response to an incoming stream. Callbacks must not be
     79    * run on the callback executor.
     80    */
     81   private final IncomingStreamHandler handler;
     82   private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
     83   private final String hostName;
     84   private int lastGoodStreamId;
     85   private int nextStreamId;
     86   private boolean shutdown;
     87   private long idleStartTimeNs = System.nanoTime();
     88 
     89   /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
     90   private Map<Integer, Ping> pings;
     91   /** User code to run in response to push promise events. */
     92   private final PushObserver pushObserver;
     93   private int nextPingId;
     94 
     95   /**
     96    * The total number of bytes consumed by the application, but not yet
     97    * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
     98    */
     99   // Visible for testing
    100   long unacknowledgedBytesRead = 0;
    101 
    102   /**
    103    * Count of bytes that can be written on the connection before receiving a
    104    * window update.
    105    */
    106   // Visible for testing
    107   long bytesLeftInWriteWindow;
    108 
    109   /** Settings we communicate to the peer. */
    110   // TODO: Do we want to dynamically adjust settings, or KISS and only set once?
    111   final Settings okHttpSettings = new Settings();
    112       // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
    113 
    114   /** Settings we receive from the peer. */
    115   // TODO: MWS will need to guard on this setting before attempting to push.
    116   final Settings peerSettings = new Settings();
    117 
    118   private boolean receivedInitialPeerSettings = false;
    119   final FrameReader frameReader;
    120   final FrameWriter frameWriter;
    121   final long maxFrameSize;
    122 
    123   // Visible for testing
    124   final Reader readerRunnable;
    125 
    126   private SpdyConnection(Builder builder) {
    127     protocol = builder.protocol;
    128     pushObserver = builder.pushObserver;
    129     client = builder.client;
    130     handler = builder.handler;
    131     nextStreamId = builder.client ? 1 : 2;
    132     nextPingId = builder.client ? 1 : 2;
    133 
    134     // Flow control was designed more for servers, or proxies than edge clients.
    135     // If we are a client, set the flow control window to 16MiB.  This avoids
    136     // thrashing window updates every 64KiB, yet small enough to avoid blowing
    137     // up the heap.
    138     if (builder.client) {
    139       okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 16 * 1024 * 1024);
    140     }
    141 
    142     hostName = builder.hostName;
    143 
    144     Variant variant;
    145     if (protocol == Protocol.HTTP_2) {
    146       variant = new Http20Draft09();
    147     } else if (protocol == Protocol.SPDY_3) {
    148       variant = new Spdy3();
    149     } else {
    150       throw new AssertionError(protocol);
    151     }
    152     bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    153     frameReader = variant.newReader(builder.source, client);
    154     frameWriter = variant.newWriter(builder.sink, client);
    155     maxFrameSize = variant.maxFrameSize();
    156 
    157     readerRunnable = new Reader();
    158     new Thread(readerRunnable).start(); // Not a daemon thread.
    159   }
    160 
    161   /** The protocol as selected using NPN or ALPN. */
    162   public Protocol getProtocol() {
    163     return protocol;
    164   }
    165 
    166   /**
    167    * Returns the number of {@link SpdyStream#isOpen() open streams} on this
    168    * connection.
    169    */
    170   public synchronized int openStreamCount() {
    171     return streams.size();
    172   }
    173 
    174   synchronized SpdyStream getStream(int id) {
    175     return streams.get(id);
    176   }
    177 
    178   synchronized SpdyStream removeStream(int streamId) {
    179     SpdyStream stream = streams.remove(streamId);
    180     if (stream != null && streams.isEmpty()) {
    181       setIdle(true);
    182     }
    183     return stream;
    184   }
    185 
    186   private synchronized void setIdle(boolean value) {
    187     idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
    188   }
    189 
    190   /** Returns true if this connection is idle. */
    191   public synchronized boolean isIdle() {
    192     return idleStartTimeNs != Long.MAX_VALUE;
    193   }
    194 
    195   /**
    196    * Returns the time in ns when this connection became idle or Long.MAX_VALUE
    197    * if connection is not idle.
    198    */
    199   public synchronized long getIdleStartTimeNs() {
    200     return idleStartTimeNs;
    201   }
    202 
    203   /**
    204    * Returns a new server-initiated stream.
    205    *
    206    * @param associatedStreamId the stream that triggered the sender to create
    207    *     this stream.
    208    * @param out true to create an output stream that we can use to send data
    209    *     to the remote peer. Corresponds to {@code FLAG_FIN}.
    210    */
    211   public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
    212       throws IOException {
    213     if (client) throw new IllegalStateException("Client cannot push requests.");
    214     if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
    215     return newStream(associatedStreamId, requestHeaders, out, false);
    216   }
    217 
    218   /**
    219    * Returns a new locally-initiated stream.
    220    *
    221    * @param out true to create an output stream that we can use to send data to the remote peer.
    222    *     Corresponds to {@code FLAG_FIN}.
    223    * @param in true to create an input stream that the remote peer can use to send data to us.
    224    *     Corresponds to {@code FLAG_UNIDIRECTIONAL}.
    225    */
    226   public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in)
    227       throws IOException {
    228     return newStream(0, requestHeaders, out, in);
    229   }
    230 
    231   private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
    232       boolean in) throws IOException {
    233     boolean outFinished = !out;
    234     boolean inFinished = !in;
    235     int priority = -1; // TODO: permit the caller to specify a priority?
    236     int slot = 0; // TODO: permit the caller to specify a slot?
    237     SpdyStream stream;
    238     int streamId;
    239 
    240     synchronized (frameWriter) {
    241       synchronized (this) {
    242         if (shutdown) {
    243           throw new IOException("shutdown");
    244         }
    245         streamId = nextStreamId;
    246         nextStreamId += 2;
    247         stream = new SpdyStream(streamId, this, outFinished, inFinished, priority, requestHeaders);
    248         if (stream.isOpen()) {
    249           streams.put(streamId, stream);
    250           setIdle(false);
    251         }
    252       }
    253       if (associatedStreamId == 0) {
    254         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, priority, slot,
    255             requestHeaders);
    256       } else if (client) {
    257         throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
    258       } else { // HTTP/2 has a PUSH_PROMISE frame.
    259         frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
    260       }
    261     }
    262 
    263     if (!out) {
    264       frameWriter.flush();
    265     }
    266 
    267     return stream;
    268   }
    269 
    270   void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
    271       throws IOException {
    272     frameWriter.synReply(outFinished, streamId, alternating);
    273   }
    274 
    275   /**
    276    * Callers of this method are not thread safe, and sometimes on application
    277    * threads.  Most often, this method will be called to send a buffer worth of
    278    * data to the peer.
    279    * <p>
    280    * Writes are subject to the write window of the stream and the connection.
    281    * Until there is a window sufficient to send {@code byteCount}, the caller
    282    * will block.  For example, a user of {@code HttpURLConnection} who flushes
    283    * more bytes to the output stream than the connection's write window will
    284    * block.
    285    * <p>
    286    * Zero {@code byteCount} writes are not subject to flow control and
    287    * will not block.  The only use case for zero {@code byteCount} is closing
    288    * a flushed output stream.
    289    */
    290   public void writeData(int streamId, boolean outFinished, OkBuffer buffer, long byteCount)
    291       throws IOException {
    292     if (byteCount == 0) { // Empty data frames are not flow-controlled.
    293       frameWriter.data(outFinished, streamId, buffer, 0);
    294       return;
    295     }
    296 
    297     while (byteCount > 0) {
    298       int toWrite;
    299       synchronized (SpdyConnection.this) {
    300         try {
    301           while (bytesLeftInWriteWindow <= 0) {
    302             SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
    303           }
    304         } catch (InterruptedException e) {
    305           throw new InterruptedIOException();
    306         }
    307 
    308         toWrite = (int) Math.min(Math.min(byteCount, bytesLeftInWriteWindow), maxFrameSize);
    309         bytesLeftInWriteWindow -= toWrite;
    310       }
    311 
    312       byteCount -= toWrite;
    313       frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
    314     }
    315   }
    316 
    317   /**
    318    * {@code delta} will be negative if a settings frame initial window is
    319    * smaller than the last.
    320    */
    321   void addBytesToWriteWindow(long delta) {
    322     bytesLeftInWriteWindow += delta;
    323     if (delta > 0) SpdyConnection.this.notifyAll();
    324   }
    325 
    326   void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
    327     executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
    328       @Override public void execute() {
    329         try {
    330           writeSynReset(streamId, errorCode);
    331         } catch (IOException ignored) {
    332         }
    333       }
    334     });
    335   }
    336 
    337   void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
    338     frameWriter.rstStream(streamId, statusCode);
    339   }
    340 
    341   void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
    342     executor.submit(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
    343       @Override public void execute() {
    344         try {
    345           frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
    346         } catch (IOException ignored) {
    347         }
    348       }
    349     });
    350   }
    351 
    352   /**
    353    * Sends a ping frame to the peer. Use the returned object to await the
    354    * ping's response and observe its round trip time.
    355    */
    356   public Ping ping() throws IOException {
    357     Ping ping = new Ping();
    358     int pingId;
    359     synchronized (this) {
    360       if (shutdown) {
    361         throw new IOException("shutdown");
    362       }
    363       pingId = nextPingId;
    364       nextPingId += 2;
    365       if (pings == null) pings = new HashMap<Integer, Ping>();
    366       pings.put(pingId, ping);
    367     }
    368     writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
    369     return ping;
    370   }
    371 
    372   private void writePingLater(
    373       final boolean reply, final int payload1, final int payload2, final Ping ping) {
    374     executor.submit(new NamedRunnable("OkHttp %s ping %08x%08x",
    375         hostName, payload1, payload2) {
    376       @Override public void execute() {
    377         try {
    378           writePing(reply, payload1, payload2, ping);
    379         } catch (IOException ignored) {
    380         }
    381       }
    382     });
    383   }
    384 
    385   private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
    386     synchronized (frameWriter) {
    387       // Observe the sent time immediately before performing I/O.
    388       if (ping != null) ping.send();
    389       frameWriter.ping(reply, payload1, payload2);
    390     }
    391   }
    392 
    393   private synchronized Ping removePing(int id) {
    394     return pings != null ? pings.remove(id) : null;
    395   }
    396 
    397   public void flush() throws IOException {
    398     frameWriter.flush();
    399   }
    400 
    401   /**
    402    * Degrades this connection such that new streams can neither be created
    403    * locally, nor accepted from the remote peer. Existing streams are not
    404    * impacted. This is intended to permit an endpoint to gracefully stop
    405    * accepting new requests without harming previously established streams.
    406    */
    407   public void shutdown(ErrorCode statusCode) throws IOException {
    408     synchronized (frameWriter) {
    409       int lastGoodStreamId;
    410       synchronized (this) {
    411         if (shutdown) {
    412           return;
    413         }
    414         shutdown = true;
    415         lastGoodStreamId = this.lastGoodStreamId;
    416       }
    417       // TODO: propagate exception message into debugData
    418       frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
    419     }
    420   }
    421 
    422   /**
    423    * Closes this connection. This cancels all open streams and unanswered
    424    * pings. It closes the underlying input and output streams and shuts down
    425    * internal executor services.
    426    */
    427   @Override public void close() throws IOException {
    428     close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
    429   }
    430 
    431   private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
    432     assert (!Thread.holdsLock(this));
    433     IOException thrown = null;
    434     try {
    435       shutdown(connectionCode);
    436     } catch (IOException e) {
    437       thrown = e;
    438     }
    439 
    440     SpdyStream[] streamsToClose = null;
    441     Ping[] pingsToCancel = null;
    442     synchronized (this) {
    443       if (!streams.isEmpty()) {
    444         streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
    445         streams.clear();
    446         setIdle(false);
    447       }
    448       if (pings != null) {
    449         pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
    450         pings = null;
    451       }
    452     }
    453 
    454     if (streamsToClose != null) {
    455       for (SpdyStream stream : streamsToClose) {
    456         try {
    457           stream.close(streamCode);
    458         } catch (IOException e) {
    459           if (thrown != null) thrown = e;
    460         }
    461       }
    462     }
    463 
    464     if (pingsToCancel != null) {
    465       for (Ping ping : pingsToCancel) {
    466         ping.cancel();
    467       }
    468     }
    469 
    470     try {
    471       frameReader.close();
    472     } catch (IOException e) {
    473       thrown = e;
    474     }
    475     try {
    476       frameWriter.close();
    477     } catch (IOException e) {
    478       if (thrown == null) thrown = e;
    479     }
    480 
    481     if (thrown != null) throw thrown;
    482   }
    483 
    484   /**
    485    * Sends a connection header if the current variant requires it. This should
    486    * be called after {@link Builder#build} for all new connections.
    487    */
    488   public void sendConnectionHeader() throws IOException {
    489     frameWriter.connectionHeader();
    490     frameWriter.settings(okHttpSettings);
    491   }
    492 
    493   public static class Builder {
    494     private String hostName;
    495     private BufferedSource source;
    496     private BufferedSink sink;
    497     private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
    498     private Protocol protocol = Protocol.SPDY_3;
    499     private PushObserver pushObserver = PushObserver.CANCEL;
    500     private boolean client;
    501 
    502     public Builder(boolean client, Socket socket) throws IOException {
    503       this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket);
    504     }
    505 
    506     /**
    507      * @param client true if this peer initiated the connection; false if this
    508      *     peer accepted the connection.
    509      */
    510     public Builder(String hostName, boolean client, Socket socket) throws IOException {
    511       this.hostName = hostName;
    512       this.client = client;
    513       this.source = Okio.buffer(Okio.source(socket.getInputStream()));
    514       this.sink = Okio.buffer(Okio.sink(socket.getOutputStream()));
    515     }
    516 
    517     public Builder handler(IncomingStreamHandler handler) {
    518       this.handler = handler;
    519       return this;
    520     }
    521 
    522     public Builder protocol(Protocol protocol) {
    523       this.protocol = protocol;
    524       return this;
    525     }
    526 
    527     public Builder pushObserver(PushObserver pushObserver) {
    528       this.pushObserver = pushObserver;
    529       return this;
    530     }
    531 
    532     public SpdyConnection build() {
    533       return new SpdyConnection(this);
    534     }
    535   }
    536 
    537   /**
    538    * Methods in this class must not lock FrameWriter.  If a method needs to
    539    * write a frame, create an async task to do so.
    540    */
    541   class Reader extends NamedRunnable implements FrameReader.Handler {
    542     private Reader() {
    543       super("OkHttp %s", hostName);
    544     }
    545 
    546     @Override protected void execute() {
    547       ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
    548       ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
    549       try {
    550         if (!client) {
    551           frameReader.readConnectionHeader();
    552         }
    553         while (frameReader.nextFrame(this)) {
    554         }
    555         connectionErrorCode = ErrorCode.NO_ERROR;
    556         streamErrorCode = ErrorCode.CANCEL;
    557       } catch (IOException e) {
    558         connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
    559         streamErrorCode = ErrorCode.PROTOCOL_ERROR;
    560       } finally {
    561         try {
    562           close(connectionErrorCode, streamErrorCode);
    563         } catch (IOException ignored) {
    564         }
    565       }
    566     }
    567 
    568     @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
    569         throws IOException {
    570       if (pushedStream(streamId)) {
    571         pushDataLater(streamId, source, length, inFinished);
    572         return;
    573       }
    574       SpdyStream dataStream = getStream(streamId);
    575       if (dataStream == null) {
    576         writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
    577         source.skip(length);
    578         return;
    579       }
    580       dataStream.receiveData(source, length);
    581       if (inFinished) {
    582         dataStream.receiveFin();
    583       }
    584     }
    585 
    586     @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
    587         int associatedStreamId, int priority, List<Header> headerBlock, HeadersMode headersMode) {
    588       if (pushedStream(streamId)) {
    589         pushHeadersLater(streamId, headerBlock, inFinished);
    590         return;
    591       }
    592       SpdyStream stream;
    593       synchronized (SpdyConnection.this) {
    594         // If we're shutdown, don't bother with this stream.
    595         if (shutdown) return;
    596 
    597         stream = getStream(streamId);
    598 
    599         if (stream == null) {
    600           // The headers claim to be for an existing stream, but we don't have one.
    601           if (headersMode.failIfStreamAbsent()) {
    602             writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
    603             return;
    604           }
    605 
    606           // If the stream ID is less than the last created ID, assume it's already closed.
    607           if (streamId <= lastGoodStreamId) return;
    608 
    609           // If the stream ID is in the client's namespace, assume it's already closed.
    610           if (streamId % 2 == nextStreamId % 2) return;
    611 
    612           // Create a stream.
    613           final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished,
    614               inFinished, priority, headerBlock);
    615           lastGoodStreamId = streamId;
    616           streams.put(streamId, newStream);
    617           executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
    618             @Override public void execute() {
    619               try {
    620                 handler.receive(newStream);
    621               } catch (IOException e) {
    622                 throw new RuntimeException(e);
    623               }
    624             }
    625           });
    626           return;
    627         }
    628       }
    629 
    630       // The headers claim to be for a new stream, but we already have one.
    631       if (headersMode.failIfStreamPresent()) {
    632         stream.closeLater(ErrorCode.PROTOCOL_ERROR);
    633         removeStream(streamId);
    634         return;
    635       }
    636 
    637       // Update an existing stream.
    638       stream.receiveHeaders(headerBlock, headersMode);
    639       if (inFinished) stream.receiveFin();
    640     }
    641 
    642     @Override public void rstStream(int streamId, ErrorCode errorCode) {
    643       if (pushedStream(streamId)) {
    644         pushResetLater(streamId, errorCode);
    645         return;
    646       }
    647       SpdyStream rstStream = removeStream(streamId);
    648       if (rstStream != null) {
    649         rstStream.receiveRstStream(errorCode);
    650       }
    651     }
    652 
    653     @Override public void settings(boolean clearPrevious, Settings newSettings) {
    654       long delta = 0;
    655       SpdyStream[] streamsToNotify = null;
    656       synchronized (SpdyConnection.this) {
    657         int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    658         if (clearPrevious) peerSettings.clear();
    659         peerSettings.merge(newSettings);
    660         if (getProtocol() == Protocol.HTTP_2) {
    661           ackSettingsLater();
    662         }
    663         int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    664         if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
    665           delta = peerInitialWindowSize - priorWriteWindowSize;
    666           if (!receivedInitialPeerSettings) {
    667             addBytesToWriteWindow(delta);
    668             receivedInitialPeerSettings = true;
    669           }
    670           if (!streams.isEmpty()) {
    671             streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
    672           }
    673         }
    674       }
    675       if (streamsToNotify != null && delta != 0) {
    676         for (SpdyStream stream : streams.values()) {
    677           synchronized (stream) {
    678             stream.addBytesToWriteWindow(delta);
    679           }
    680         }
    681       }
    682     }
    683 
    684     private void ackSettingsLater() {
    685       executor.submit(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
    686         @Override public void execute() {
    687           try {
    688             frameWriter.ackSettings();
    689           } catch (IOException ignored) {
    690           }
    691         }
    692       });
    693     }
    694 
    695     @Override public void ackSettings() {
    696       // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
    697     }
    698 
    699     @Override public void ping(boolean reply, int payload1, int payload2) {
    700       if (reply) {
    701         Ping ping = removePing(payload1);
    702         if (ping != null) {
    703           ping.receive();
    704         }
    705       } else {
    706         // Send a reply to a client ping if this is a server and vice versa.
    707         writePingLater(true, payload1, payload2, null);
    708       }
    709     }
    710 
    711     @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
    712       if (debugData.size() > 0) { // TODO: log the debugData
    713       }
    714       synchronized (SpdyConnection.this) {
    715         shutdown = true;
    716 
    717         // Fail all streams created after the last good stream ID.
    718         for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator();
    719             i.hasNext(); ) {
    720           Map.Entry<Integer, SpdyStream> entry = i.next();
    721           int streamId = entry.getKey();
    722           if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
    723             entry.getValue().receiveRstStream(ErrorCode.REFUSED_STREAM);
    724             i.remove();
    725           }
    726         }
    727       }
    728     }
    729 
    730     @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
    731       if (streamId == 0) {
    732         synchronized (SpdyConnection.this) {
    733           bytesLeftInWriteWindow += windowSizeIncrement;
    734           SpdyConnection.this.notifyAll();
    735         }
    736       } else {
    737         SpdyStream stream = getStream(streamId);
    738         if (stream != null) {
    739           synchronized (stream) {
    740             stream.addBytesToWriteWindow(windowSizeIncrement);
    741           }
    742         }
    743       }
    744     }
    745 
    746     @Override public void priority(int streamId, int priority) {
    747       // TODO: honor priority.
    748     }
    749 
    750     @Override
    751     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
    752       pushRequestLater(promisedStreamId, requestHeaders);
    753     }
    754   }
    755 
    756   /** Even, positive numbered streams are pushed streams in HTTP/2. */
    757   private boolean pushedStream(int streamId) {
    758     return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
    759   }
    760 
    761   // Guarded by this.
    762   private final Set<Integer> currentPushRequests = new LinkedHashSet<Integer>();
    763 
    764   private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
    765     synchronized (this) {
    766       if (currentPushRequests.contains(streamId)) {
    767         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
    768         return;
    769       }
    770       currentPushRequests.add(streamId);
    771     }
    772     executor.submit(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
    773       @Override public void execute() {
    774         boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
    775         try {
    776           if (cancel) {
    777             frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    778             synchronized (SpdyConnection.this) {
    779               currentPushRequests.remove(streamId);
    780             }
    781           }
    782         } catch (IOException ignored) {
    783         }
    784       }
    785     });
    786   }
    787 
    788   private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
    789       final boolean inFinished) {
    790     executor.submit(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
    791       @Override public void execute() {
    792         boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
    793         try {
    794           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    795           if (cancel || inFinished) {
    796             synchronized (SpdyConnection.this) {
    797               currentPushRequests.remove(streamId);
    798             }
    799           }
    800         } catch (IOException ignored) {
    801         }
    802       }
    803     });
    804   }
    805 
    806   /**
    807    * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
    808    * process the data.  This avoids corrupting the stream.
    809    */
    810   private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
    811       final boolean inFinished) throws IOException {
    812     final OkBuffer buffer = new OkBuffer();
    813     source.require(byteCount); // Eagerly read the frame before firing client thread.
    814     source.read(buffer, byteCount);
    815     if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
    816     executor.submit(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
    817       @Override public void execute() {
    818         try {
    819           boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
    820           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    821           if (cancel || inFinished) {
    822             synchronized (SpdyConnection.this) {
    823               currentPushRequests.remove(streamId);
    824             }
    825           }
    826         } catch (IOException ignored) {
    827         }
    828       }
    829     });
    830   }
    831 
    832   private void pushResetLater(final int streamId, final ErrorCode errorCode) {
    833     executor.submit(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
    834       @Override public void execute() {
    835         pushObserver.onReset(streamId, errorCode);
    836         synchronized (SpdyConnection.this) {
    837           currentPushRequests.remove(streamId);
    838         }
    839       }
    840     });
    841   }
    842 }
    843