Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.squareup.okhttp.internal.spdy;
     18 
     19 import com.squareup.okhttp.internal.NamedRunnable;
     20 import com.squareup.okhttp.internal.Util;
     21 import java.io.Closeable;
     22 import java.io.IOException;
     23 import java.io.InputStream;
     24 import java.io.OutputStream;
     25 import java.net.Socket;
     26 import java.util.HashMap;
     27 import java.util.Iterator;
     28 import java.util.List;
     29 import java.util.Map;
     30 import java.util.concurrent.ExecutorService;
     31 import java.util.concurrent.SynchronousQueue;
     32 import java.util.concurrent.ThreadPoolExecutor;
     33 import java.util.concurrent.TimeUnit;
     34 
     35 /**
     36  * A socket connection to a remote peer. A connection hosts streams which can
     37  * send and receive data.
     38  *
     39  * <p>Many methods in this API are <strong>synchronous:</strong> the call is
     40  * completed before the method returns. This is typical for Java but atypical
     41  * for SPDY. This is motivated by exception transparency: an IOException that
     42  * was triggered by a certain caller can be caught and handled by that caller.
     43  */
     44 public final class SpdyConnection implements Closeable {
     45 
     46   // Internal state of this connection is guarded by 'this'. No blocking
     47   // operations may be performed while holding this lock!
     48   //
     49   // Socket writes are guarded by spdyWriter.
     50   //
     51   // Socket reads are unguarded but are only made by the reader thread.
     52   //
     53   // Certain operations (like SYN_STREAM) need to synchronize on both the
     54   // spdyWriter (to do blocking I/O) and this (to create streams). Such
     55   // operations must synchronize on 'this' last. This ensures that we never
     56   // wait for a blocking operation while holding 'this'.
     57 
     58   static final int FLAG_FIN = 0x1;
     59   static final int FLAG_UNIDIRECTIONAL = 0x2;
     60 
     61   static final int TYPE_DATA = 0x0;
     62   static final int TYPE_SYN_STREAM = 0x1;
     63   static final int TYPE_SYN_REPLY = 0x2;
     64   static final int TYPE_RST_STREAM = 0x3;
     65   static final int TYPE_SETTINGS = 0x4;
     66   static final int TYPE_NOOP = 0x5;
     67   static final int TYPE_PING = 0x6;
     68   static final int TYPE_GOAWAY = 0x7;
     69   static final int TYPE_HEADERS = 0x8;
     70   static final int TYPE_WINDOW_UPDATE = 0x9;
     71   static final int TYPE_CREDENTIAL = 0x10;
     72   static final int VERSION = 3;
     73 
     74   static final int GOAWAY_OK = 0;
     75   static final int GOAWAY_PROTOCOL_ERROR = 1;
     76   static final int GOAWAY_INTERNAL_ERROR = 2;
     77 
     78   private static final ExecutorService executor = new ThreadPoolExecutor(0,
     79       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
     80       Util.daemonThreadFactory("OkHttp SpdyConnection"));
     81 
     82   /** True if this peer initiated the connection. */
     83   final boolean client;
     84 
     85   /**
     86    * User code to run in response to an incoming stream. Callbacks must not be
     87    * run on the callback executor.
     88    */
     89   private final IncomingStreamHandler handler;
     90   private final SpdyReader spdyReader;
     91   private final SpdyWriter spdyWriter;
     92 
     93   private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
     94   private final String hostName;
     95   private int lastGoodStreamId;
     96   private int nextStreamId;
     97   private boolean shutdown;
     98   private long idleStartTimeNs = System.nanoTime();
     99 
    100   /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
    101   private Map<Integer, Ping> pings;
    102   private int nextPingId;
    103 
    104   /** Lazily-created settings for this connection. */
    105   Settings settings;
    106 
    107   private SpdyConnection(Builder builder) {
    108     client = builder.client;
    109     handler = builder.handler;
    110     spdyReader = new SpdyReader(builder.in);
    111     spdyWriter = new SpdyWriter(builder.out);
    112     nextStreamId = builder.client ? 1 : 2;
    113     nextPingId = builder.client ? 1 : 2;
    114 
    115     hostName = builder.hostName;
    116 
    117     new Thread(new Reader(), "Spdy Reader " + hostName).start();
    118   }
    119 
    120   /**
    121    * Returns the number of {@link SpdyStream#isOpen() open streams} on this
    122    * connection.
    123    */
    124   public synchronized int openStreamCount() {
    125     return streams.size();
    126   }
    127 
    128   private synchronized SpdyStream getStream(int id) {
    129     return streams.get(id);
    130   }
    131 
    132   synchronized SpdyStream removeStream(int streamId) {
    133     SpdyStream stream = streams.remove(streamId);
    134     if (stream != null && streams.isEmpty()) {
    135       setIdle(true);
    136     }
    137     return stream;
    138   }
    139 
    140   private synchronized void setIdle(boolean value) {
    141     idleStartTimeNs = value ? System.nanoTime() : 0L;
    142   }
    143 
    144   /** Returns true if this connection is idle. */
    145   public synchronized boolean isIdle() {
    146     return idleStartTimeNs != 0L;
    147   }
    148 
    149   /** Returns the time in ns when this connection became idle or 0L if connection is not idle. */
    150   public synchronized long getIdleStartTimeNs() {
    151     return idleStartTimeNs;
    152   }
    153 
    154   /**
    155    * Returns a new locally-initiated stream.
    156    *
    157    * @param out true to create an output stream that we can use to send data
    158    * to the remote peer. Corresponds to {@code FLAG_FIN}.
    159    * @param in true to create an input stream that the remote peer can use to
    160    * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}.
    161    */
    162   public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
    163       throws IOException {
    164     int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL);
    165     int associatedStreamId = 0;  // TODO: permit the caller to specify an associated stream?
    166     int priority = 0; // TODO: permit the caller to specify a priority?
    167     int slot = 0; // TODO: permit the caller to specify a slot?
    168     SpdyStream stream;
    169     int streamId;
    170 
    171     synchronized (spdyWriter) {
    172       synchronized (this) {
    173         if (shutdown) {
    174           throw new IOException("shutdown");
    175         }
    176         streamId = nextStreamId;
    177         nextStreamId += 2;
    178         stream = new SpdyStream(streamId, this, flags, priority, slot, requestHeaders, settings);
    179         if (stream.isOpen()) {
    180           streams.put(streamId, stream);
    181           setIdle(false);
    182         }
    183       }
    184 
    185       spdyWriter.synStream(flags, streamId, associatedStreamId, priority, slot, requestHeaders);
    186     }
    187 
    188     return stream;
    189   }
    190 
    191   void writeSynReply(int streamId, int flags, List<String> alternating) throws IOException {
    192     spdyWriter.synReply(flags, streamId, alternating);
    193   }
    194 
    195   /** Writes a complete data frame. */
    196   void writeFrame(byte[] bytes, int offset, int length) throws IOException {
    197     synchronized (spdyWriter) {
    198       spdyWriter.out.write(bytes, offset, length);
    199     }
    200   }
    201 
    202   void writeSynResetLater(final int streamId, final int statusCode) {
    203     executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) {
    204       @Override public void execute() {
    205         try {
    206           writeSynReset(streamId, statusCode);
    207         } catch (IOException ignored) {
    208         }
    209       }
    210     });
    211   }
    212 
    213   void writeSynReset(int streamId, int statusCode) throws IOException {
    214     spdyWriter.rstStream(streamId, statusCode);
    215   }
    216 
    217   void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) {
    218     executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) {
    219       @Override public void execute() {
    220         try {
    221           writeWindowUpdate(streamId, deltaWindowSize);
    222         } catch (IOException ignored) {
    223         }
    224       }
    225     });
    226   }
    227 
    228   void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException {
    229     spdyWriter.windowUpdate(streamId, deltaWindowSize);
    230   }
    231 
    232   /**
    233    * Sends a ping frame to the peer. Use the returned object to await the
    234    * ping's response and observe its round trip time.
    235    */
    236   public Ping ping() throws IOException {
    237     Ping ping = new Ping();
    238     int pingId;
    239     synchronized (this) {
    240       if (shutdown) {
    241         throw new IOException("shutdown");
    242       }
    243       pingId = nextPingId;
    244       nextPingId += 2;
    245       if (pings == null) pings = new HashMap<Integer, Ping>();
    246       pings.put(pingId, ping);
    247     }
    248     writePing(pingId, ping);
    249     return ping;
    250   }
    251 
    252   private void writePingLater(final int streamId, final Ping ping) {
    253     executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) {
    254       @Override public void execute() {
    255         try {
    256           writePing(streamId, ping);
    257         } catch (IOException ignored) {
    258         }
    259       }
    260     });
    261   }
    262 
    263   private void writePing(int id, Ping ping) throws IOException {
    264     synchronized (spdyWriter) {
    265       // Observe the sent time immediately before performing I/O.
    266       if (ping != null) ping.send();
    267       spdyWriter.ping(0, id);
    268     }
    269   }
    270 
    271   private synchronized Ping removePing(int id) {
    272     return pings != null ? pings.remove(id) : null;
    273   }
    274 
    275   /** Sends a noop frame to the peer. */
    276   public void noop() throws IOException {
    277     spdyWriter.noop();
    278   }
    279 
    280   public void flush() throws IOException {
    281     synchronized (spdyWriter) {
    282       spdyWriter.out.flush();
    283     }
    284   }
    285 
    286   /**
    287    * Degrades this connection such that new streams can neither be created
    288    * locally, nor accepted from the remote peer. Existing streams are not
    289    * impacted. This is intended to permit an endpoint to gracefully stop
    290    * accepting new requests without harming previously established streams.
    291    *
    292    * @param statusCode one of {@link #GOAWAY_OK}, {@link
    293    * #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}.
    294    */
    295   public void shutdown(int statusCode) throws IOException {
    296     synchronized (spdyWriter) {
    297       int lastGoodStreamId;
    298       synchronized (this) {
    299         if (shutdown) {
    300           return;
    301         }
    302         shutdown = true;
    303         lastGoodStreamId = this.lastGoodStreamId;
    304       }
    305       spdyWriter.goAway(0, lastGoodStreamId, statusCode);
    306     }
    307   }
    308 
    309   /**
    310    * Closes this connection. This cancels all open streams and unanswered
    311    * pings. It closes the underlying input and output streams and shuts down
    312    * internal executor services.
    313    */
    314   @Override public void close() throws IOException {
    315     close(GOAWAY_OK, SpdyStream.RST_CANCEL);
    316   }
    317 
    318   private void close(int shutdownStatusCode, int rstStatusCode) throws IOException {
    319     assert (!Thread.holdsLock(this));
    320     IOException thrown = null;
    321     try {
    322       shutdown(shutdownStatusCode);
    323     } catch (IOException e) {
    324       thrown = e;
    325     }
    326 
    327     SpdyStream[] streamsToClose = null;
    328     Ping[] pingsToCancel = null;
    329     synchronized (this) {
    330       if (!streams.isEmpty()) {
    331         streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
    332         streams.clear();
    333         setIdle(false);
    334       }
    335       if (pings != null) {
    336         pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
    337         pings = null;
    338       }
    339     }
    340 
    341     if (streamsToClose != null) {
    342       for (SpdyStream stream : streamsToClose) {
    343         try {
    344           stream.close(rstStatusCode);
    345         } catch (IOException e) {
    346           if (thrown != null) thrown = e;
    347         }
    348       }
    349     }
    350 
    351     if (pingsToCancel != null) {
    352       for (Ping ping : pingsToCancel) {
    353         ping.cancel();
    354       }
    355     }
    356 
    357     try {
    358       spdyReader.close();
    359     } catch (IOException e) {
    360       thrown = e;
    361     }
    362     try {
    363       spdyWriter.close();
    364     } catch (IOException e) {
    365       if (thrown == null) thrown = e;
    366     }
    367 
    368     if (thrown != null) throw thrown;
    369   }
    370 
    371   public static class Builder {
    372     private String hostName;
    373     private InputStream in;
    374     private OutputStream out;
    375     private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
    376     public boolean client;
    377 
    378     public Builder(boolean client, Socket socket) throws IOException {
    379       this("", client, socket.getInputStream(), socket.getOutputStream());
    380     }
    381 
    382     public Builder(boolean client, InputStream in, OutputStream out) {
    383       this("", client, in, out);
    384     }
    385 
    386     /**
    387      * @param client true if this peer initiated the connection; false if
    388      * this peer accepted the connection.
    389      */
    390     public Builder(String hostName, boolean client, Socket socket) throws IOException {
    391       this(hostName, client, socket.getInputStream(), socket.getOutputStream());
    392     }
    393 
    394     /**
    395      * @param client true if this peer initiated the connection; false if this
    396      * peer accepted the connection.
    397      */
    398     public Builder(String hostName, boolean client, InputStream in, OutputStream out) {
    399       this.hostName = hostName;
    400       this.client = client;
    401       this.in = in;
    402       this.out = out;
    403     }
    404 
    405     public Builder handler(IncomingStreamHandler handler) {
    406       this.handler = handler;
    407       return this;
    408     }
    409 
    410     public SpdyConnection build() {
    411       return new SpdyConnection(this);
    412     }
    413   }
    414 
    415   private class Reader implements Runnable, SpdyReader.Handler {
    416     @Override public void run() {
    417       int shutdownStatusCode = GOAWAY_INTERNAL_ERROR;
    418       int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR;
    419       try {
    420         while (spdyReader.nextFrame(this)) {
    421         }
    422         shutdownStatusCode = GOAWAY_OK;
    423         rstStatusCode = SpdyStream.RST_CANCEL;
    424       } catch (IOException e) {
    425         shutdownStatusCode = GOAWAY_PROTOCOL_ERROR;
    426         rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR;
    427       } finally {
    428         try {
    429           close(shutdownStatusCode, rstStatusCode);
    430         } catch (IOException ignored) {
    431         }
    432       }
    433     }
    434 
    435     @Override public void data(int flags, int streamId, InputStream in, int length)
    436         throws IOException {
    437       SpdyStream dataStream = getStream(streamId);
    438       if (dataStream == null) {
    439         writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
    440         Util.skipByReading(in, length);
    441         return;
    442       }
    443       dataStream.receiveData(in, length);
    444       if ((flags & SpdyConnection.FLAG_FIN) != 0) {
    445         dataStream.receiveFin();
    446       }
    447     }
    448 
    449     @Override public void synStream(int flags, int streamId, int associatedStreamId, int priority,
    450         int slot, List<String> nameValueBlock) {
    451       final SpdyStream synStream;
    452       final SpdyStream previous;
    453       synchronized (SpdyConnection.this) {
    454         synStream =
    455             new SpdyStream(streamId, SpdyConnection.this, flags, priority, slot, nameValueBlock,
    456                 settings);
    457         if (shutdown) {
    458           return;
    459         }
    460         lastGoodStreamId = streamId;
    461         previous = streams.put(streamId, synStream);
    462       }
    463       if (previous != null) {
    464         previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
    465         removeStream(streamId);
    466         return;
    467       }
    468 
    469       executor.submit(new NamedRunnable("OkHttp SPDY Callback %s stream %d", hostName, streamId) {
    470         @Override public void execute() {
    471           try {
    472             handler.receive(synStream);
    473           } catch (IOException e) {
    474             throw new RuntimeException(e);
    475           }
    476         }
    477       });
    478     }
    479 
    480     @Override public void synReply(int flags, int streamId, List<String> nameValueBlock)
    481         throws IOException {
    482       SpdyStream replyStream = getStream(streamId);
    483       if (replyStream == null) {
    484         writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
    485         return;
    486       }
    487       replyStream.receiveReply(nameValueBlock);
    488       if ((flags & SpdyConnection.FLAG_FIN) != 0) {
    489         replyStream.receiveFin();
    490       }
    491     }
    492 
    493     @Override public void headers(int flags, int streamId, List<String> nameValueBlock)
    494         throws IOException {
    495       SpdyStream replyStream = getStream(streamId);
    496       if (replyStream != null) {
    497         replyStream.receiveHeaders(nameValueBlock);
    498       }
    499     }
    500 
    501     @Override public void rstStream(int flags, int streamId, int statusCode) {
    502       SpdyStream rstStream = removeStream(streamId);
    503       if (rstStream != null) {
    504         rstStream.receiveRstStream(statusCode);
    505       }
    506     }
    507 
    508     @Override public void settings(int flags, Settings newSettings) {
    509       SpdyStream[] streamsToNotify = null;
    510       synchronized (SpdyConnection.this) {
    511         if (settings == null || (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) {
    512           settings = newSettings;
    513         } else {
    514           settings.merge(newSettings);
    515         }
    516         if (!streams.isEmpty()) {
    517           streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
    518         }
    519       }
    520       if (streamsToNotify != null) {
    521         for (SpdyStream stream : streamsToNotify) {
    522           // The synchronization here is ugly. We need to synchronize on 'this' to guard
    523           // reads to 'settings'. We synchronize on 'stream' to guard the state change.
    524           // And we need to acquire the 'stream' lock first, since that may block.
    525           synchronized (stream) {
    526             synchronized (SpdyConnection.this) {
    527               stream.receiveSettings(settings);
    528             }
    529           }
    530         }
    531       }
    532     }
    533 
    534     @Override public void noop() {
    535     }
    536 
    537     @Override public void ping(int flags, int streamId) {
    538       if (client != (streamId % 2 == 1)) {
    539         // Respond to a client ping if this is a server and vice versa.
    540         writePingLater(streamId, null);
    541       } else {
    542         Ping ping = removePing(streamId);
    543         if (ping != null) {
    544           ping.receive();
    545         }
    546       }
    547     }
    548 
    549     @Override public void goAway(int flags, int lastGoodStreamId, int statusCode) {
    550       synchronized (SpdyConnection.this) {
    551         shutdown = true;
    552 
    553         // Fail all streams created after the last good stream ID.
    554         for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator();
    555             i.hasNext(); ) {
    556           Map.Entry<Integer, SpdyStream> entry = i.next();
    557           int streamId = entry.getKey();
    558           if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
    559             entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM);
    560             i.remove();
    561           }
    562         }
    563       }
    564     }
    565 
    566     @Override public void windowUpdate(int flags, int streamId, int deltaWindowSize) {
    567       SpdyStream stream = getStream(streamId);
    568       if (stream != null) {
    569         stream.receiveWindowUpdate(deltaWindowSize);
    570       }
    571     }
    572   }
    573 }
    574