Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp.internal.spdy;
     17 
     18 import com.squareup.okhttp.Protocol;
     19 import com.squareup.okhttp.internal.NamedRunnable;
     20 import com.squareup.okhttp.internal.Util;
     21 import java.io.Closeable;
     22 import java.io.IOException;
     23 import java.io.InterruptedIOException;
     24 import java.net.InetSocketAddress;
     25 import java.net.Socket;
     26 import java.util.HashMap;
     27 import java.util.LinkedHashSet;
     28 import java.util.List;
     29 import java.util.Map;
     30 import java.util.Set;
     31 import java.util.concurrent.ExecutorService;
     32 import java.util.concurrent.LinkedBlockingQueue;
     33 import java.util.concurrent.SynchronousQueue;
     34 import java.util.concurrent.ThreadPoolExecutor;
     35 import java.util.concurrent.TimeUnit;
     36 import okio.Buffer;
     37 import okio.BufferedSource;
     38 import okio.ByteString;
     39 import okio.Okio;
     40 
     41 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
     42 
     43 /**
     44  * A socket connection to a remote peer. A connection hosts streams which can
     45  * send and receive data.
     46  *
     47  * <p>Many methods in this API are <strong>synchronous:</strong> the call is
     48  * completed before the method returns. This is typical for Java but atypical
     49  * for SPDY. This is motivated by exception transparency: an IOException that
     50  * was triggered by a certain caller can be caught and handled by that caller.
     51  */
     52 public final class SpdyConnection implements Closeable {
     53 
     54   // Internal state of this connection is guarded by 'this'. No blocking
     55   // operations may be performed while holding this lock!
     56   //
     57   // Socket writes are guarded by frameWriter.
     58   //
     59   // Socket reads are unguarded but are only made by the reader thread.
     60   //
     61   // Certain operations (like SYN_STREAM) need to synchronize on both the
     62   // frameWriter (to do blocking I/O) and this (to create streams). Such
     63   // operations must synchronize on 'this' last. This ensures that we never
     64   // wait for a blocking operation while holding 'this'.
     65 
     66   private static final ExecutorService executor = new ThreadPoolExecutor(0,
     67       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
     68       Util.threadFactory("OkHttp SpdyConnection", true));
     69 
     70   /** The protocol variant, like {@link com.squareup.okhttp.internal.spdy.Spdy3}. */
     71   final Protocol protocol;
     72 
     73   /** True if this peer initiated the connection. */
     74   final boolean client;
     75 
     76   /**
     77    * User code to run in response to an incoming stream. Callbacks must not be
     78    * run on the callback executor.
     79    */
     80   private final IncomingStreamHandler handler;
     81   private final Map<Integer, SpdyStream> streams = new HashMap<>();
     82   private final String hostName;
     83   private int lastGoodStreamId;
     84   private int nextStreamId;
     85   private boolean shutdown;
     86   private long idleStartTimeNs = System.nanoTime();
     87 
     88   /** Ensures push promise callbacks events are sent in order per stream. */
     89   private final ExecutorService pushExecutor;
     90 
     91   /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
     92   private Map<Integer, Ping> pings;
     93   /** User code to run in response to push promise events. */
     94   private final PushObserver pushObserver;
     95   private int nextPingId;
     96 
     97   /**
     98    * The total number of bytes consumed by the application, but not yet
     99    * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
    100    */
    101   // Visible for testing
    102   long unacknowledgedBytesRead = 0;
    103 
    104   /**
    105    * Count of bytes that can be written on the connection before receiving a
    106    * window update.
    107    */
    108   // Visible for testing
    109   long bytesLeftInWriteWindow;
    110 
    111   /** Settings we communicate to the peer. */
    112   // TODO: Do we want to dynamically adjust settings, or KISS and only set once?
    113   final Settings okHttpSettings = new Settings();
    114       // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
    115   private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
    116 
    117   /** Settings we receive from the peer. */
    118   // TODO: MWS will need to guard on this setting before attempting to push.
    119   final Settings peerSettings = new Settings();
    120 
    121   private boolean receivedInitialPeerSettings = false;
    122   final Variant variant;
    123   final Socket socket;
    124   final FrameWriter frameWriter;
    125 
    126   // Visible for testing
    127   final Reader readerRunnable;
    128 
    129   private SpdyConnection(Builder builder) throws IOException {
    130     protocol = builder.protocol;
    131     pushObserver = builder.pushObserver;
    132     client = builder.client;
    133     handler = builder.handler;
    134     // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
    135     nextStreamId = builder.client ? 1 : 2;
    136     if (builder.client && protocol == Protocol.HTTP_2) {
    137       nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
    138     }
    139 
    140     nextPingId = builder.client ? 1 : 2;
    141 
    142     // Flow control was designed more for servers, or proxies than edge clients.
    143     // If we are a client, set the flow control window to 16MiB.  This avoids
    144     // thrashing window updates every 64KiB, yet small enough to avoid blowing
    145     // up the heap.
    146     if (builder.client) {
    147       okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
    148     }
    149 
    150     hostName = builder.hostName;
    151 
    152     if (protocol == Protocol.HTTP_2) {
    153       variant = new Http2();
    154       // Like newSingleThreadExecutor, except lazy creates the thread.
    155       pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
    156           new LinkedBlockingQueue<Runnable>(),
    157           Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true));
    158       // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2
    159       peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
    160       peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE);
    161     } else if (protocol == Protocol.SPDY_3) {
    162       variant = new Spdy3();
    163       pushExecutor = null;
    164     } else {
    165       throw new AssertionError(protocol);
    166     }
    167     bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    168     socket = builder.socket;
    169     frameWriter = variant.newWriter(Okio.buffer(Okio.sink(builder.socket)), client);
    170 
    171     readerRunnable = new Reader();
    172     new Thread(readerRunnable).start(); // Not a daemon thread.
    173   }
    174 
    175   /** The protocol as selected using ALPN. */
    176   public Protocol getProtocol() {
    177     return protocol;
    178   }
    179 
    180   /**
    181    * Returns the number of {@link SpdyStream#isOpen() open streams} on this
    182    * connection.
    183    */
    184   public synchronized int openStreamCount() {
    185     return streams.size();
    186   }
    187 
    188   synchronized SpdyStream getStream(int id) {
    189     return streams.get(id);
    190   }
    191 
    192   synchronized SpdyStream removeStream(int streamId) {
    193     SpdyStream stream = streams.remove(streamId);
    194     if (stream != null && streams.isEmpty()) {
    195       setIdle(true);
    196     }
    197     notifyAll(); // The removed stream may be blocked on a connection-wide window update.
    198     return stream;
    199   }
    200 
    201   private synchronized void setIdle(boolean value) {
    202     idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
    203   }
    204 
    205   /** Returns true if this connection is idle. */
    206   public synchronized boolean isIdle() {
    207     return idleStartTimeNs != Long.MAX_VALUE;
    208   }
    209 
    210   /**
    211    * Returns the time in ns when this connection became idle or Long.MAX_VALUE
    212    * if connection is not idle.
    213    */
    214   public synchronized long getIdleStartTimeNs() {
    215     return idleStartTimeNs;
    216   }
    217 
    218   /**
    219    * Returns a new server-initiated stream.
    220    *
    221    * @param associatedStreamId the stream that triggered the sender to create
    222    *     this stream.
    223    * @param out true to create an output stream that we can use to send data
    224    *     to the remote peer. Corresponds to {@code FLAG_FIN}.
    225    */
    226   public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
    227       throws IOException {
    228     if (client) throw new IllegalStateException("Client cannot push requests.");
    229     if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
    230     return newStream(associatedStreamId, requestHeaders, out, false);
    231   }
    232 
    233   /**
    234    * Returns a new locally-initiated stream.
    235    *
    236    * @param out true to create an output stream that we can use to send data to the remote peer.
    237    *     Corresponds to {@code FLAG_FIN}.
    238    * @param in true to create an input stream that the remote peer can use to send data to us.
    239    *     Corresponds to {@code FLAG_UNIDIRECTIONAL}.
    240    */
    241   public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in)
    242       throws IOException {
    243     return newStream(0, requestHeaders, out, in);
    244   }
    245 
    246   private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
    247       boolean in) throws IOException {
    248     boolean outFinished = !out;
    249     boolean inFinished = !in;
    250     SpdyStream stream;
    251     int streamId;
    252 
    253     synchronized (frameWriter) {
    254       synchronized (this) {
    255         if (shutdown) {
    256           throw new IOException("shutdown");
    257         }
    258         streamId = nextStreamId;
    259         nextStreamId += 2;
    260         stream = new SpdyStream(streamId, this, outFinished, inFinished, requestHeaders);
    261         if (stream.isOpen()) {
    262           streams.put(streamId, stream);
    263           setIdle(false);
    264         }
    265       }
    266       if (associatedStreamId == 0) {
    267         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
    268             requestHeaders);
    269       } else if (client) {
    270         throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
    271       } else { // HTTP/2 has a PUSH_PROMISE frame.
    272         frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
    273       }
    274     }
    275 
    276     if (!out) {
    277       frameWriter.flush();
    278     }
    279 
    280     return stream;
    281   }
    282 
    283   void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
    284       throws IOException {
    285     frameWriter.synReply(outFinished, streamId, alternating);
    286   }
    287 
    288   /**
    289    * Callers of this method are not thread safe, and sometimes on application threads. Most often,
    290    * this method will be called to send a buffer worth of data to the peer.
    291    *
    292    * <p>Writes are subject to the write window of the stream and the connection. Until there is a
    293    * window sufficient to send {@code byteCount}, the caller will block. For example, a user of
    294    * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's
    295    * write window will block.
    296    *
    297    * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only
    298    * use case for zero {@code byteCount} is closing a flushed output stream.
    299    */
    300   public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
    301       throws IOException {
    302     if (byteCount == 0) { // Empty data frames are not flow-controlled.
    303       frameWriter.data(outFinished, streamId, buffer, 0);
    304       return;
    305     }
    306 
    307     while (byteCount > 0) {
    308       int toWrite;
    309       synchronized (SpdyConnection.this) {
    310         try {
    311           while (bytesLeftInWriteWindow <= 0) {
    312             // Before blocking, confirm that the stream we're writing is still open. It's possible
    313             // that the stream has since been closed (such as if this write timed out.)
    314             if (!streams.containsKey(streamId)) {
    315               throw new IOException("stream closed");
    316             }
    317             SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
    318           }
    319         } catch (InterruptedException e) {
    320           throw new InterruptedIOException();
    321         }
    322 
    323         toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
    324         toWrite = Math.min(toWrite, frameWriter.maxDataLength());
    325         bytesLeftInWriteWindow -= toWrite;
    326       }
    327 
    328       byteCount -= toWrite;
    329       frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
    330     }
    331   }
    332 
    333   /**
    334    * {@code delta} will be negative if a settings frame initial window is
    335    * smaller than the last.
    336    */
    337   void addBytesToWriteWindow(long delta) {
    338     bytesLeftInWriteWindow += delta;
    339     if (delta > 0) SpdyConnection.this.notifyAll();
    340   }
    341 
    342   void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
    343     executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
    344       @Override public void execute() {
    345         try {
    346           writeSynReset(streamId, errorCode);
    347         } catch (IOException ignored) {
    348         }
    349       }
    350     });
    351   }
    352 
    353   void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
    354     frameWriter.rstStream(streamId, statusCode);
    355   }
    356 
    357   void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
    358     executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
    359       @Override public void execute() {
    360         try {
    361           frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
    362         } catch (IOException ignored) {
    363         }
    364       }
    365     });
    366   }
    367 
    368   /**
    369    * Sends a ping frame to the peer. Use the returned object to await the
    370    * ping's response and observe its round trip time.
    371    */
    372   public Ping ping() throws IOException {
    373     Ping ping = new Ping();
    374     int pingId;
    375     synchronized (this) {
    376       if (shutdown) {
    377         throw new IOException("shutdown");
    378       }
    379       pingId = nextPingId;
    380       nextPingId += 2;
    381       if (pings == null) pings = new HashMap<>();
    382       pings.put(pingId, ping);
    383     }
    384     writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
    385     return ping;
    386   }
    387 
    388   private void writePingLater(
    389       final boolean reply, final int payload1, final int payload2, final Ping ping) {
    390     executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x",
    391         hostName, payload1, payload2) {
    392       @Override public void execute() {
    393         try {
    394           writePing(reply, payload1, payload2, ping);
    395         } catch (IOException ignored) {
    396         }
    397       }
    398     });
    399   }
    400 
    401   private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
    402     synchronized (frameWriter) {
    403       // Observe the sent time immediately before performing I/O.
    404       if (ping != null) ping.send();
    405       frameWriter.ping(reply, payload1, payload2);
    406     }
    407   }
    408 
    409   private synchronized Ping removePing(int id) {
    410     return pings != null ? pings.remove(id) : null;
    411   }
    412 
    413   public void flush() throws IOException {
    414     frameWriter.flush();
    415   }
    416 
    417   /**
    418    * Degrades this connection such that new streams can neither be created
    419    * locally, nor accepted from the remote peer. Existing streams are not
    420    * impacted. This is intended to permit an endpoint to gracefully stop
    421    * accepting new requests without harming previously established streams.
    422    */
    423   public void shutdown(ErrorCode statusCode) throws IOException {
    424     synchronized (frameWriter) {
    425       int lastGoodStreamId;
    426       synchronized (this) {
    427         if (shutdown) {
    428           return;
    429         }
    430         shutdown = true;
    431         lastGoodStreamId = this.lastGoodStreamId;
    432       }
    433       // TODO: propagate exception message into debugData
    434       frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
    435     }
    436   }
    437 
    438   /**
    439    * Closes this connection. This cancels all open streams and unanswered
    440    * pings. It closes the underlying input and output streams and shuts down
    441    * internal executor services.
    442    */
    443   @Override public void close() throws IOException {
    444     close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
    445   }
    446 
    447   private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
    448     assert (!Thread.holdsLock(this));
    449     IOException thrown = null;
    450     try {
    451       shutdown(connectionCode);
    452     } catch (IOException e) {
    453       thrown = e;
    454     }
    455 
    456     SpdyStream[] streamsToClose = null;
    457     Ping[] pingsToCancel = null;
    458     synchronized (this) {
    459       if (!streams.isEmpty()) {
    460         streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
    461         streams.clear();
    462         setIdle(false);
    463       }
    464       if (pings != null) {
    465         pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
    466         pings = null;
    467       }
    468     }
    469 
    470     if (streamsToClose != null) {
    471       for (SpdyStream stream : streamsToClose) {
    472         try {
    473           stream.close(streamCode);
    474         } catch (IOException e) {
    475           if (thrown != null) thrown = e;
    476         }
    477       }
    478     }
    479 
    480     if (pingsToCancel != null) {
    481       for (Ping ping : pingsToCancel) {
    482         ping.cancel();
    483       }
    484     }
    485 
    486     // Close the writer to release its resources (such as deflaters).
    487     try {
    488       frameWriter.close();
    489     } catch (IOException e) {
    490       if (thrown == null) thrown = e;
    491     }
    492 
    493     // Close the socket to break out the reader thread, which will clean up after itself.
    494     try {
    495       socket.close();
    496     } catch (IOException e) {
    497       thrown = e;
    498     }
    499 
    500     if (thrown != null) throw thrown;
    501   }
    502 
    503   /**
    504    * Sends a connection header if the current variant requires it. This should
    505    * be called after {@link Builder#build} for all new connections.
    506    */
    507   public void sendConnectionPreface() throws IOException {
    508     frameWriter.connectionPreface();
    509     frameWriter.settings(okHttpSettings);
    510     int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
    511     if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
    512       frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
    513     }
    514   }
    515 
    516   public static class Builder {
    517     private String hostName;
    518     private Socket socket;
    519     private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
    520     private Protocol protocol = Protocol.SPDY_3;
    521     private PushObserver pushObserver = PushObserver.CANCEL;
    522     private boolean client;
    523 
    524     public Builder(boolean client, Socket socket) throws IOException {
    525       this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket);
    526     }
    527 
    528     /**
    529      * @param client true if this peer initiated the connection; false if this
    530      *     peer accepted the connection.
    531      */
    532     public Builder(String hostName, boolean client, Socket socket) throws IOException {
    533       this.hostName = hostName;
    534       this.client = client;
    535       this.socket = socket;
    536     }
    537 
    538     public Builder handler(IncomingStreamHandler handler) {
    539       this.handler = handler;
    540       return this;
    541     }
    542 
    543     public Builder protocol(Protocol protocol) {
    544       this.protocol = protocol;
    545       return this;
    546     }
    547 
    548     public Builder pushObserver(PushObserver pushObserver) {
    549       this.pushObserver = pushObserver;
    550       return this;
    551     }
    552 
    553     public SpdyConnection build() throws IOException {
    554       return new SpdyConnection(this);
    555     }
    556   }
    557 
    558   /**
    559    * Methods in this class must not lock FrameWriter.  If a method needs to
    560    * write a frame, create an async task to do so.
    561    */
    562   class Reader extends NamedRunnable implements FrameReader.Handler {
    563     FrameReader frameReader;
    564 
    565     private Reader() {
    566       super("OkHttp %s", hostName);
    567     }
    568 
    569     @Override protected void execute() {
    570       ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
    571       ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
    572       try {
    573         frameReader = variant.newReader(Okio.buffer(Okio.source(socket)), client);
    574         if (!client) {
    575           frameReader.readConnectionPreface();
    576         }
    577         while (frameReader.nextFrame(this)) {
    578         }
    579         connectionErrorCode = ErrorCode.NO_ERROR;
    580         streamErrorCode = ErrorCode.CANCEL;
    581       } catch (RuntimeException | IOException e) {
    582         connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
    583         streamErrorCode = ErrorCode.PROTOCOL_ERROR;
    584       } finally {
    585         try {
    586           close(connectionErrorCode, streamErrorCode);
    587         } catch (IOException ignored) {
    588         }
    589         Util.closeQuietly(frameReader);
    590       }
    591     }
    592 
    593     @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
    594         throws IOException {
    595       if (pushedStream(streamId)) {
    596         pushDataLater(streamId, source, length, inFinished);
    597         return;
    598       }
    599       SpdyStream dataStream = getStream(streamId);
    600       if (dataStream == null) {
    601         writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
    602         source.skip(length);
    603         return;
    604       }
    605       dataStream.receiveData(source, length);
    606       if (inFinished) {
    607         dataStream.receiveFin();
    608       }
    609     }
    610 
    611     @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
    612         int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
    613       if (pushedStream(streamId)) {
    614         pushHeadersLater(streamId, headerBlock, inFinished);
    615         return;
    616       }
    617       SpdyStream stream;
    618       synchronized (SpdyConnection.this) {
    619         // If we're shutdown, don't bother with this stream.
    620         if (shutdown) return;
    621 
    622         stream = getStream(streamId);
    623 
    624         if (stream == null) {
    625           // The headers claim to be for an existing stream, but we don't have one.
    626           if (headersMode.failIfStreamAbsent()) {
    627             writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
    628             return;
    629           }
    630 
    631           // If the stream ID is less than the last created ID, assume it's already closed.
    632           if (streamId <= lastGoodStreamId) return;
    633 
    634           // If the stream ID is in the client's namespace, assume it's already closed.
    635           if (streamId % 2 == nextStreamId % 2) return;
    636 
    637           // Create a stream.
    638           final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished,
    639               inFinished, headerBlock);
    640           lastGoodStreamId = streamId;
    641           streams.put(streamId, newStream);
    642           executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
    643             @Override public void execute() {
    644               try {
    645                 handler.receive(newStream);
    646               } catch (RuntimeException | IOException e) {
    647                 try {
    648                   newStream.close(ErrorCode.PROTOCOL_ERROR);
    649                 } catch (IOException ignored) {
    650                 }
    651               }
    652             }
    653           });
    654           return;
    655         }
    656       }
    657 
    658       // The headers claim to be for a new stream, but we already have one.
    659       if (headersMode.failIfStreamPresent()) {
    660         stream.closeLater(ErrorCode.PROTOCOL_ERROR);
    661         removeStream(streamId);
    662         return;
    663       }
    664 
    665       // Update an existing stream.
    666       stream.receiveHeaders(headerBlock, headersMode);
    667       if (inFinished) stream.receiveFin();
    668     }
    669 
    670     @Override public void rstStream(int streamId, ErrorCode errorCode) {
    671       if (pushedStream(streamId)) {
    672         pushResetLater(streamId, errorCode);
    673         return;
    674       }
    675       SpdyStream rstStream = removeStream(streamId);
    676       if (rstStream != null) {
    677         rstStream.receiveRstStream(errorCode);
    678       }
    679     }
    680 
    681     @Override public void settings(boolean clearPrevious, Settings newSettings) {
    682       long delta = 0;
    683       SpdyStream[] streamsToNotify = null;
    684       synchronized (SpdyConnection.this) {
    685         int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    686         if (clearPrevious) peerSettings.clear();
    687         peerSettings.merge(newSettings);
    688         if (getProtocol() == Protocol.HTTP_2) {
    689           ackSettingsLater(newSettings);
    690         }
    691         int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
    692         if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
    693           delta = peerInitialWindowSize - priorWriteWindowSize;
    694           if (!receivedInitialPeerSettings) {
    695             addBytesToWriteWindow(delta);
    696             receivedInitialPeerSettings = true;
    697           }
    698           if (!streams.isEmpty()) {
    699             streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
    700           }
    701         }
    702       }
    703       if (streamsToNotify != null && delta != 0) {
    704         for (SpdyStream stream : streamsToNotify) {
    705           synchronized (stream) {
    706             stream.addBytesToWriteWindow(delta);
    707           }
    708         }
    709       }
    710     }
    711 
    712     private void ackSettingsLater(final Settings peerSettings) {
    713       executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
    714         @Override public void execute() {
    715           try {
    716             frameWriter.ackSettings(peerSettings);
    717           } catch (IOException ignored) {
    718           }
    719         }
    720       });
    721     }
    722 
    723     @Override public void ackSettings() {
    724       // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
    725     }
    726 
    727     @Override public void ping(boolean reply, int payload1, int payload2) {
    728       if (reply) {
    729         Ping ping = removePing(payload1);
    730         if (ping != null) {
    731           ping.receive();
    732         }
    733       } else {
    734         // Send a reply to a client ping if this is a server and vice versa.
    735         writePingLater(true, payload1, payload2, null);
    736       }
    737     }
    738 
    739     @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
    740       if (debugData.size() > 0) { // TODO: log the debugData
    741       }
    742 
    743       // Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
    744       SpdyStream[] streamsCopy;
    745       synchronized (SpdyConnection.this) {
    746         streamsCopy = streams.values().toArray(new SpdyStream[streams.size()]);
    747         shutdown = true;
    748       }
    749 
    750       // Fail all streams created after the last good stream ID.
    751       for (SpdyStream spdyStream : streamsCopy) {
    752         if (spdyStream.getId() > lastGoodStreamId && spdyStream.isLocallyInitiated()) {
    753           spdyStream.receiveRstStream(ErrorCode.REFUSED_STREAM);
    754           removeStream(spdyStream.getId());
    755         }
    756       }
    757     }
    758 
    759     @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
    760       if (streamId == 0) {
    761         synchronized (SpdyConnection.this) {
    762           bytesLeftInWriteWindow += windowSizeIncrement;
    763           SpdyConnection.this.notifyAll();
    764         }
    765       } else {
    766         SpdyStream stream = getStream(streamId);
    767         if (stream != null) {
    768           synchronized (stream) {
    769             stream.addBytesToWriteWindow(windowSizeIncrement);
    770           }
    771         }
    772       }
    773     }
    774 
    775     @Override public void priority(int streamId, int streamDependency, int weight,
    776         boolean exclusive) {
    777       // TODO: honor priority.
    778     }
    779 
    780     @Override
    781     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
    782       pushRequestLater(promisedStreamId, requestHeaders);
    783     }
    784 
    785     @Override public void alternateService(int streamId, String origin, ByteString protocol,
    786         String host, int port, long maxAge) {
    787       // TODO: register alternate service.
    788     }
    789   }
    790 
    791   /** Even, positive numbered streams are pushed streams in HTTP/2. */
    792   private boolean pushedStream(int streamId) {
    793     return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
    794   }
    795 
    796   // Guarded by this.
    797   private final Set<Integer> currentPushRequests = new LinkedHashSet<>();
    798 
    799   private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
    800     synchronized (this) {
    801       if (currentPushRequests.contains(streamId)) {
    802         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
    803         return;
    804       }
    805       currentPushRequests.add(streamId);
    806     }
    807     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
    808       @Override public void execute() {
    809         boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
    810         try {
    811           if (cancel) {
    812             frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    813             synchronized (SpdyConnection.this) {
    814               currentPushRequests.remove(streamId);
    815             }
    816           }
    817         } catch (IOException ignored) {
    818         }
    819       }
    820     });
    821   }
    822 
    823   private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
    824       final boolean inFinished) {
    825     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
    826       @Override public void execute() {
    827         boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
    828         try {
    829           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    830           if (cancel || inFinished) {
    831             synchronized (SpdyConnection.this) {
    832               currentPushRequests.remove(streamId);
    833             }
    834           }
    835         } catch (IOException ignored) {
    836         }
    837       }
    838     });
    839   }
    840 
    841   /**
    842    * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
    843    * process the data.  This avoids corrupting the stream.
    844    */
    845   private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
    846       final boolean inFinished) throws IOException {
    847     final Buffer buffer = new Buffer();
    848     source.require(byteCount); // Eagerly read the frame before firing client thread.
    849     source.read(buffer, byteCount);
    850     if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
    851     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
    852       @Override public void execute() {
    853         try {
    854           boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
    855           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
    856           if (cancel || inFinished) {
    857             synchronized (SpdyConnection.this) {
    858               currentPushRequests.remove(streamId);
    859             }
    860           }
    861         } catch (IOException ignored) {
    862         }
    863       }
    864     });
    865   }
    866 
    867   private void pushResetLater(final int streamId, final ErrorCode errorCode) {
    868     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
    869       @Override public void execute() {
    870         pushObserver.onReset(streamId, errorCode);
    871         synchronized (SpdyConnection.this) {
    872           currentPushRequests.remove(streamId);
    873         }
    874       }
    875     });
    876   }
    877 }
    878