1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package libcore.net.spdy; 18 19 import java.io.Closeable; 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.io.OutputStream; 23 import java.net.Socket; 24 import java.util.Collections; 25 import java.util.HashMap; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.concurrent.Executor; 29 import java.util.concurrent.ExecutorService; 30 import java.util.concurrent.Executors; 31 32 /** 33 * A socket connection to a remote peer. A connection hosts streams which can 34 * send and receive data. 35 */ 36 public final class SpdyConnection implements Closeable { 37 38 /* 39 * Socket writes are guarded by this. Socket reads are unguarded but are 40 * only made by the reader thread. 41 */ 42 43 static final int FLAG_FIN = 0x01; 44 static final int FLAG_UNIDIRECTIONAL = 0x02; 45 46 static final int TYPE_EOF = -1; 47 static final int TYPE_DATA = 0x00; 48 static final int TYPE_SYN_STREAM = 0x01; 49 static final int TYPE_SYN_REPLY = 0x02; 50 static final int TYPE_RST_STREAM = 0x03; 51 static final int TYPE_SETTINGS = 0x04; 52 static final int TYPE_NOOP = 0x05; 53 static final int TYPE_PING = 0x06; 54 static final int TYPE_GOAWAY = 0x07; 55 static final int TYPE_HEADERS = 0x08; 56 static final int VERSION = 2; 57 58 /** Guarded by this. */ 59 private int nextStreamId; 60 private final SpdyReader spdyReader; 61 private final SpdyWriter spdyWriter; 62 private final Executor executor; 63 64 /** 65 * User code to run in response to an incoming stream. This must not be run 66 * on the read thread, otherwise a deadlock is possible. 67 */ 68 private final IncomingStreamHandler handler; 69 70 private final Map<Integer, SpdyStream> streams = Collections.synchronizedMap( 71 new HashMap<Integer, SpdyStream>()); 72 73 private SpdyConnection(Builder builder) { 74 nextStreamId = builder.client ? 1 : 2; 75 spdyReader = new SpdyReader(builder.in); 76 spdyWriter = new SpdyWriter(builder.out); 77 handler = builder.handler; 78 79 String name = isClient() ? "ClientReader" : "ServerReader"; 80 executor = builder.executor != null 81 ? builder.executor 82 : Executors.newCachedThreadPool(Threads.newThreadFactory(name)); 83 executor.execute(new Reader()); 84 } 85 86 /** 87 * Returns true if this peer initiated the connection. 88 */ 89 public boolean isClient() { 90 return nextStreamId % 2 == 1; 91 } 92 93 private SpdyStream getStream(int id) { 94 SpdyStream stream = streams.get(id); 95 if (stream == null) { 96 throw new UnsupportedOperationException("TODO " + id + "; " + streams); // TODO: rst stream 97 } 98 return stream; 99 } 100 101 void removeStream(int streamId) { 102 streams.remove(streamId); 103 } 104 105 /** 106 * Returns a new locally-initiated stream. 107 * 108 * @param out true to create an output stream that we can use to send data 109 * to the remote peer. Corresponds to {@code FLAG_FIN}. 110 * @param in true to create an input stream that the remote peer can use to 111 * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}. 112 */ 113 public synchronized SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in) 114 throws IOException { 115 int streamId = nextStreamId; // TODO 116 nextStreamId += 2; 117 int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL); 118 int associatedStreamId = 0; // TODO 119 int priority = 0; // TODO 120 121 SpdyStream result = new SpdyStream(streamId, this, requestHeaders, flags); 122 streams.put(streamId, result); 123 124 spdyWriter.flags = flags; 125 spdyWriter.streamId = streamId; 126 spdyWriter.associatedStreamId = associatedStreamId; 127 spdyWriter.priority = priority; 128 spdyWriter.nameValueBlock = requestHeaders; 129 spdyWriter.synStream(); 130 131 return result; 132 } 133 134 synchronized void writeSynReply(int streamId, List<String> alternating) throws IOException { 135 int flags = 0; // TODO 136 spdyWriter.flags = flags; 137 spdyWriter.streamId = streamId; 138 spdyWriter.nameValueBlock = alternating; 139 spdyWriter.synReply(); 140 } 141 142 /** Writes a complete data frame. */ 143 synchronized void writeFrame(byte[] bytes, int offset, int length) throws IOException { 144 spdyWriter.out.write(bytes, offset, length); 145 } 146 147 void writeSynResetLater(final int streamId, final int statusCode) { 148 executor.execute(new Runnable() { 149 @Override public void run() { 150 try { 151 writeSynReset(streamId, statusCode); 152 } catch (IOException ignored) { 153 } 154 } 155 }); 156 } 157 158 synchronized void writeSynReset(int streamId, int statusCode) throws IOException { 159 int flags = 0; // TODO 160 spdyWriter.flags = flags; 161 spdyWriter.streamId = streamId; 162 spdyWriter.statusCode = statusCode; 163 spdyWriter.synReset(); 164 } 165 166 public synchronized void flush() throws IOException { 167 spdyWriter.out.flush(); 168 } 169 170 @Override public synchronized void close() throws IOException { 171 // TODO: graceful close; send RST frames 172 // TODO: close all streams to release waiting readers 173 if (executor instanceof ExecutorService) { 174 ((ExecutorService) executor).shutdown(); 175 } 176 } 177 178 public static class Builder { 179 private InputStream in; 180 private OutputStream out; 181 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 182 private Executor executor; 183 public boolean client; 184 185 /** 186 * @param client true if this peer initiated the connection; false if 187 * this peer accepted the connection. 188 */ 189 public Builder(boolean client, Socket socket) throws IOException { 190 this(client, socket.getInputStream(), socket.getOutputStream()); 191 } 192 193 /** 194 * @param client true if this peer initiated the connection; false if this 195 * peer accepted the connection. 196 */ 197 public Builder(boolean client, InputStream in, OutputStream out) { 198 this.client = client; 199 this.in = in; 200 this.out = out; 201 } 202 203 public Builder executor(Executor executor) { 204 this.executor = executor; 205 return this; 206 } 207 208 public Builder handler(IncomingStreamHandler handler) { 209 this.handler = handler; 210 return this; 211 } 212 213 public SpdyConnection build() { 214 return new SpdyConnection(this); 215 } 216 } 217 218 private class Reader implements Runnable { 219 @Override public void run() { 220 try { 221 while (readFrame()) { 222 } 223 close(); 224 } catch (Throwable e) { 225 e.printStackTrace(); // TODO 226 } 227 } 228 229 private boolean readFrame() throws IOException { 230 switch (spdyReader.nextFrame()) { 231 case TYPE_EOF: 232 return false; 233 234 case TYPE_DATA: 235 getStream(spdyReader.streamId) 236 .receiveData(spdyReader.in, spdyReader.flags, spdyReader.length); 237 return true; 238 239 case TYPE_SYN_STREAM: 240 final SpdyStream stream = new SpdyStream(spdyReader.streamId, SpdyConnection.this, 241 spdyReader.nameValueBlock, spdyReader.flags); 242 SpdyStream previous = streams.put(spdyReader.streamId, stream); 243 if (previous != null) { 244 previous.close(SpdyStream.RST_PROTOCOL_ERROR); 245 } 246 executor.execute(new Runnable() { 247 @Override public void run() { 248 try { 249 handler.receive(stream); 250 } catch (IOException e) { 251 throw new RuntimeException(e); 252 } 253 } 254 }); 255 return true; 256 257 case TYPE_SYN_REPLY: 258 // TODO: honor flags 259 getStream(spdyReader.streamId).receiveReply(spdyReader.nameValueBlock); 260 return true; 261 262 case TYPE_RST_STREAM: 263 getStream(spdyReader.streamId).receiveRstStream(spdyReader.statusCode); 264 return true; 265 266 case SpdyConnection.TYPE_SETTINGS: 267 // TODO: implement 268 System.out.println("Unimplemented TYPE_SETTINGS frame discarded"); 269 return true; 270 271 case SpdyConnection.TYPE_NOOP: 272 case SpdyConnection.TYPE_PING: 273 case SpdyConnection.TYPE_GOAWAY: 274 case SpdyConnection.TYPE_HEADERS: 275 throw new UnsupportedOperationException(); 276 277 default: 278 // TODO: throw IOException here? 279 return false; 280 } 281 } 282 } 283 } 284