Home | History | Annotate | Download | only in spdy
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package libcore.net.spdy;
     18 
     19 import java.io.Closeable;
     20 import java.io.IOException;
     21 import java.io.InputStream;
     22 import java.io.OutputStream;
     23 import java.net.Socket;
     24 import java.util.Collections;
     25 import java.util.HashMap;
     26 import java.util.List;
     27 import java.util.Map;
     28 import java.util.concurrent.Executor;
     29 import java.util.concurrent.ExecutorService;
     30 import java.util.concurrent.Executors;
     31 
     32 /**
     33  * A socket connection to a remote peer. A connection hosts streams which can
     34  * send and receive data.
     35  */
     36 public final class SpdyConnection implements Closeable {
     37 
     38     /*
     39      * Socket writes are guarded by this. Socket reads are unguarded but are
     40      * only made by the reader thread.
     41      */
     42 
     43     static final int FLAG_FIN = 0x01;
     44     static final int FLAG_UNIDIRECTIONAL = 0x02;
     45 
     46     static final int TYPE_EOF = -1;
     47     static final int TYPE_DATA = 0x00;
     48     static final int TYPE_SYN_STREAM = 0x01;
     49     static final int TYPE_SYN_REPLY = 0x02;
     50     static final int TYPE_RST_STREAM = 0x03;
     51     static final int TYPE_SETTINGS = 0x04;
     52     static final int TYPE_NOOP = 0x05;
     53     static final int TYPE_PING = 0x06;
     54     static final int TYPE_GOAWAY = 0x07;
     55     static final int TYPE_HEADERS = 0x08;
     56     static final int VERSION = 2;
     57 
     58     /** Guarded by this. */
     59     private int nextStreamId;
     60     private final SpdyReader spdyReader;
     61     private final SpdyWriter spdyWriter;
     62     private final Executor executor;
     63 
     64     /**
     65      * User code to run in response to an incoming stream. This must not be run
     66      * on the read thread, otherwise a deadlock is possible.
     67      */
     68     private final IncomingStreamHandler handler;
     69 
     70     private final Map<Integer, SpdyStream> streams = Collections.synchronizedMap(
     71             new HashMap<Integer, SpdyStream>());
     72 
     73     private SpdyConnection(Builder builder) {
     74         nextStreamId = builder.client ? 1 : 2;
     75         spdyReader = new SpdyReader(builder.in);
     76         spdyWriter = new SpdyWriter(builder.out);
     77         handler = builder.handler;
     78 
     79         String name = isClient() ? "ClientReader" : "ServerReader";
     80         executor = builder.executor != null
     81                 ? builder.executor
     82                 : Executors.newCachedThreadPool(Threads.newThreadFactory(name));
     83         executor.execute(new Reader());
     84     }
     85 
     86     /**
     87      * Returns true if this peer initiated the connection.
     88      */
     89     public boolean isClient() {
     90         return nextStreamId % 2 == 1;
     91     }
     92 
     93     private SpdyStream getStream(int id) {
     94         SpdyStream stream = streams.get(id);
     95         if (stream == null) {
     96             throw new UnsupportedOperationException("TODO " + id + "; " + streams); // TODO: rst stream
     97         }
     98         return stream;
     99     }
    100 
    101     void removeStream(int streamId) {
    102         streams.remove(streamId);
    103     }
    104 
    105     /**
    106      * Returns a new locally-initiated stream.
    107      *
    108      * @param out true to create an output stream that we can use to send data
    109      *     to the remote peer. Corresponds to {@code FLAG_FIN}.
    110      * @param in true to create an input stream that the remote peer can use to
    111      *     send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}.
    112      */
    113     public synchronized SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
    114             throws IOException {
    115         int streamId = nextStreamId; // TODO
    116         nextStreamId += 2;
    117         int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL);
    118         int associatedStreamId = 0;  // TODO
    119         int priority = 0; // TODO
    120 
    121         SpdyStream result = new SpdyStream(streamId, this, requestHeaders, flags);
    122         streams.put(streamId, result);
    123 
    124         spdyWriter.flags = flags;
    125         spdyWriter.streamId = streamId;
    126         spdyWriter.associatedStreamId = associatedStreamId;
    127         spdyWriter.priority = priority;
    128         spdyWriter.nameValueBlock = requestHeaders;
    129         spdyWriter.synStream();
    130 
    131         return result;
    132     }
    133 
    134     synchronized void writeSynReply(int streamId, List<String> alternating) throws IOException {
    135         int flags = 0; // TODO
    136         spdyWriter.flags = flags;
    137         spdyWriter.streamId = streamId;
    138         spdyWriter.nameValueBlock = alternating;
    139         spdyWriter.synReply();
    140     }
    141 
    142     /** Writes a complete data frame. */
    143     synchronized void writeFrame(byte[] bytes, int offset, int length) throws IOException {
    144         spdyWriter.out.write(bytes, offset, length);
    145     }
    146 
    147     void writeSynResetLater(final int streamId, final int statusCode) {
    148         executor.execute(new Runnable() {
    149             @Override public void run() {
    150                 try {
    151                     writeSynReset(streamId, statusCode);
    152                 } catch (IOException ignored) {
    153                 }
    154             }
    155         });
    156     }
    157 
    158     synchronized void writeSynReset(int streamId, int statusCode) throws IOException {
    159         int flags = 0; // TODO
    160         spdyWriter.flags = flags;
    161         spdyWriter.streamId = streamId;
    162         spdyWriter.statusCode = statusCode;
    163         spdyWriter.synReset();
    164     }
    165 
    166     public synchronized void flush() throws IOException {
    167         spdyWriter.out.flush();
    168     }
    169 
    170     @Override public synchronized void close() throws IOException {
    171         // TODO: graceful close; send RST frames
    172         // TODO: close all streams to release waiting readers
    173         if (executor instanceof ExecutorService) {
    174             ((ExecutorService) executor).shutdown();
    175         }
    176     }
    177 
    178     public static class Builder {
    179         private InputStream in;
    180         private OutputStream out;
    181         private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
    182         private Executor executor;
    183         public boolean client;
    184 
    185         /**
    186          * @param client true if this peer initiated the connection; false if
    187          *     this peer accepted the connection.
    188          */
    189         public Builder(boolean client, Socket socket) throws IOException {
    190             this(client, socket.getInputStream(), socket.getOutputStream());
    191         }
    192 
    193         /**
    194          * @param client true if this peer initiated the connection; false if this
    195          *     peer accepted the connection.
    196          */
    197         public Builder(boolean client, InputStream in, OutputStream out) {
    198             this.client = client;
    199             this.in = in;
    200             this.out = out;
    201         }
    202 
    203         public Builder executor(Executor executor) {
    204             this.executor = executor;
    205             return this;
    206         }
    207 
    208         public Builder handler(IncomingStreamHandler handler) {
    209             this.handler = handler;
    210             return this;
    211         }
    212 
    213         public SpdyConnection build() {
    214             return new SpdyConnection(this);
    215         }
    216     }
    217 
    218     private class Reader implements Runnable {
    219         @Override public void run() {
    220             try {
    221                 while (readFrame()) {
    222                 }
    223                 close();
    224             } catch (Throwable e) {
    225                 e.printStackTrace(); // TODO
    226             }
    227         }
    228 
    229         private boolean readFrame() throws IOException {
    230             switch (spdyReader.nextFrame()) {
    231             case TYPE_EOF:
    232                 return false;
    233 
    234             case TYPE_DATA:
    235                 getStream(spdyReader.streamId)
    236                         .receiveData(spdyReader.in, spdyReader.flags, spdyReader.length);
    237                 return true;
    238 
    239             case TYPE_SYN_STREAM:
    240                 final SpdyStream stream = new SpdyStream(spdyReader.streamId, SpdyConnection.this,
    241                         spdyReader.nameValueBlock, spdyReader.flags);
    242                 SpdyStream previous = streams.put(spdyReader.streamId, stream);
    243                 if (previous != null) {
    244                     previous.close(SpdyStream.RST_PROTOCOL_ERROR);
    245                 }
    246                 executor.execute(new Runnable() {
    247                     @Override public void run() {
    248                         try {
    249                             handler.receive(stream);
    250                         } catch (IOException e) {
    251                             throw new RuntimeException(e);
    252                         }
    253                     }
    254                 });
    255                 return true;
    256 
    257             case TYPE_SYN_REPLY:
    258                 // TODO: honor flags
    259                 getStream(spdyReader.streamId).receiveReply(spdyReader.nameValueBlock);
    260                 return true;
    261 
    262             case TYPE_RST_STREAM:
    263                 getStream(spdyReader.streamId).receiveRstStream(spdyReader.statusCode);
    264                 return true;
    265 
    266             case SpdyConnection.TYPE_SETTINGS:
    267                 // TODO: implement
    268                 System.out.println("Unimplemented TYPE_SETTINGS frame discarded");
    269                 return true;
    270 
    271             case SpdyConnection.TYPE_NOOP:
    272             case SpdyConnection.TYPE_PING:
    273             case SpdyConnection.TYPE_GOAWAY:
    274             case SpdyConnection.TYPE_HEADERS:
    275                 throw new UnsupportedOperationException();
    276 
    277             default:
    278                 // TODO: throw IOException here?
    279                 return false;
    280             }
    281         }
    282     }
    283 }
    284