1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.squareup.okhttp.internal.spdy; 18 19 import com.squareup.okhttp.internal.NamedRunnable; 20 import com.squareup.okhttp.internal.Util; 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.io.InputStream; 24 import java.io.OutputStream; 25 import java.net.Socket; 26 import java.util.HashMap; 27 import java.util.Iterator; 28 import java.util.List; 29 import java.util.Map; 30 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.SynchronousQueue; 32 import java.util.concurrent.ThreadPoolExecutor; 33 import java.util.concurrent.TimeUnit; 34 35 /** 36 * A socket connection to a remote peer. A connection hosts streams which can 37 * send and receive data. 38 * 39 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 40 * completed before the method returns. This is typical for Java but atypical 41 * for SPDY. This is motivated by exception transparency: an IOException that 42 * was triggered by a certain caller can be caught and handled by that caller. 43 */ 44 public final class SpdyConnection implements Closeable { 45 46 // Internal state of this connection is guarded by 'this'. No blocking 47 // operations may be performed while holding this lock! 48 // 49 // Socket writes are guarded by spdyWriter. 50 // 51 // Socket reads are unguarded but are only made by the reader thread. 52 // 53 // Certain operations (like SYN_STREAM) need to synchronize on both the 54 // spdyWriter (to do blocking I/O) and this (to create streams). Such 55 // operations must synchronize on 'this' last. This ensures that we never 56 // wait for a blocking operation while holding 'this'. 57 58 static final int FLAG_FIN = 0x1; 59 static final int FLAG_UNIDIRECTIONAL = 0x2; 60 61 static final int TYPE_DATA = 0x0; 62 static final int TYPE_SYN_STREAM = 0x1; 63 static final int TYPE_SYN_REPLY = 0x2; 64 static final int TYPE_RST_STREAM = 0x3; 65 static final int TYPE_SETTINGS = 0x4; 66 static final int TYPE_NOOP = 0x5; 67 static final int TYPE_PING = 0x6; 68 static final int TYPE_GOAWAY = 0x7; 69 static final int TYPE_HEADERS = 0x8; 70 static final int TYPE_WINDOW_UPDATE = 0x9; 71 static final int TYPE_CREDENTIAL = 0x10; 72 static final int VERSION = 3; 73 74 static final int GOAWAY_OK = 0; 75 static final int GOAWAY_PROTOCOL_ERROR = 1; 76 static final int GOAWAY_INTERNAL_ERROR = 2; 77 78 private static final ExecutorService executor = new ThreadPoolExecutor(0, 79 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 80 Util.daemonThreadFactory("OkHttp SpdyConnection")); 81 82 /** True if this peer initiated the connection. */ 83 final boolean client; 84 85 /** 86 * User code to run in response to an incoming stream. Callbacks must not be 87 * run on the callback executor. 88 */ 89 private final IncomingStreamHandler handler; 90 private final SpdyReader spdyReader; 91 private final SpdyWriter spdyWriter; 92 93 private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>(); 94 private final String hostName; 95 private int lastGoodStreamId; 96 private int nextStreamId; 97 private boolean shutdown; 98 private long idleStartTimeNs = System.nanoTime(); 99 100 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 101 private Map<Integer, Ping> pings; 102 private int nextPingId; 103 104 /** Lazily-created settings for this connection. */ 105 Settings settings; 106 107 private SpdyConnection(Builder builder) { 108 client = builder.client; 109 handler = builder.handler; 110 spdyReader = new SpdyReader(builder.in); 111 spdyWriter = new SpdyWriter(builder.out); 112 nextStreamId = builder.client ? 1 : 2; 113 nextPingId = builder.client ? 1 : 2; 114 115 hostName = builder.hostName; 116 117 new Thread(new Reader(), "Spdy Reader " + hostName).start(); 118 } 119 120 /** 121 * Returns the number of {@link SpdyStream#isOpen() open streams} on this 122 * connection. 123 */ 124 public synchronized int openStreamCount() { 125 return streams.size(); 126 } 127 128 private synchronized SpdyStream getStream(int id) { 129 return streams.get(id); 130 } 131 132 synchronized SpdyStream removeStream(int streamId) { 133 SpdyStream stream = streams.remove(streamId); 134 if (stream != null && streams.isEmpty()) { 135 setIdle(true); 136 } 137 return stream; 138 } 139 140 private synchronized void setIdle(boolean value) { 141 idleStartTimeNs = value ? System.nanoTime() : 0L; 142 } 143 144 /** Returns true if this connection is idle. */ 145 public synchronized boolean isIdle() { 146 return idleStartTimeNs != 0L; 147 } 148 149 /** Returns the time in ns when this connection became idle or 0L if connection is not idle. */ 150 public synchronized long getIdleStartTimeNs() { 151 return idleStartTimeNs; 152 } 153 154 /** 155 * Returns a new locally-initiated stream. 156 * 157 * @param out true to create an output stream that we can use to send data 158 * to the remote peer. Corresponds to {@code FLAG_FIN}. 159 * @param in true to create an input stream that the remote peer can use to 160 * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}. 161 */ 162 public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in) 163 throws IOException { 164 int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL); 165 int associatedStreamId = 0; // TODO: permit the caller to specify an associated stream? 166 int priority = 0; // TODO: permit the caller to specify a priority? 167 int slot = 0; // TODO: permit the caller to specify a slot? 168 SpdyStream stream; 169 int streamId; 170 171 synchronized (spdyWriter) { 172 synchronized (this) { 173 if (shutdown) { 174 throw new IOException("shutdown"); 175 } 176 streamId = nextStreamId; 177 nextStreamId += 2; 178 stream = new SpdyStream(streamId, this, flags, priority, slot, requestHeaders, settings); 179 if (stream.isOpen()) { 180 streams.put(streamId, stream); 181 setIdle(false); 182 } 183 } 184 185 spdyWriter.synStream(flags, streamId, associatedStreamId, priority, slot, requestHeaders); 186 } 187 188 return stream; 189 } 190 191 void writeSynReply(int streamId, int flags, List<String> alternating) throws IOException { 192 spdyWriter.synReply(flags, streamId, alternating); 193 } 194 195 /** Writes a complete data frame. */ 196 void writeFrame(byte[] bytes, int offset, int length) throws IOException { 197 synchronized (spdyWriter) { 198 spdyWriter.out.write(bytes, offset, length); 199 } 200 } 201 202 void writeSynResetLater(final int streamId, final int statusCode) { 203 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) { 204 @Override public void execute() { 205 try { 206 writeSynReset(streamId, statusCode); 207 } catch (IOException ignored) { 208 } 209 } 210 }); 211 } 212 213 void writeSynReset(int streamId, int statusCode) throws IOException { 214 spdyWriter.rstStream(streamId, statusCode); 215 } 216 217 void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) { 218 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) { 219 @Override public void execute() { 220 try { 221 writeWindowUpdate(streamId, deltaWindowSize); 222 } catch (IOException ignored) { 223 } 224 } 225 }); 226 } 227 228 void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException { 229 spdyWriter.windowUpdate(streamId, deltaWindowSize); 230 } 231 232 /** 233 * Sends a ping frame to the peer. Use the returned object to await the 234 * ping's response and observe its round trip time. 235 */ 236 public Ping ping() throws IOException { 237 Ping ping = new Ping(); 238 int pingId; 239 synchronized (this) { 240 if (shutdown) { 241 throw new IOException("shutdown"); 242 } 243 pingId = nextPingId; 244 nextPingId += 2; 245 if (pings == null) pings = new HashMap<Integer, Ping>(); 246 pings.put(pingId, ping); 247 } 248 writePing(pingId, ping); 249 return ping; 250 } 251 252 private void writePingLater(final int streamId, final Ping ping) { 253 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) { 254 @Override public void execute() { 255 try { 256 writePing(streamId, ping); 257 } catch (IOException ignored) { 258 } 259 } 260 }); 261 } 262 263 private void writePing(int id, Ping ping) throws IOException { 264 synchronized (spdyWriter) { 265 // Observe the sent time immediately before performing I/O. 266 if (ping != null) ping.send(); 267 spdyWriter.ping(0, id); 268 } 269 } 270 271 private synchronized Ping removePing(int id) { 272 return pings != null ? pings.remove(id) : null; 273 } 274 275 /** Sends a noop frame to the peer. */ 276 public void noop() throws IOException { 277 spdyWriter.noop(); 278 } 279 280 public void flush() throws IOException { 281 synchronized (spdyWriter) { 282 spdyWriter.out.flush(); 283 } 284 } 285 286 /** 287 * Degrades this connection such that new streams can neither be created 288 * locally, nor accepted from the remote peer. Existing streams are not 289 * impacted. This is intended to permit an endpoint to gracefully stop 290 * accepting new requests without harming previously established streams. 291 * 292 * @param statusCode one of {@link #GOAWAY_OK}, {@link 293 * #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}. 294 */ 295 public void shutdown(int statusCode) throws IOException { 296 synchronized (spdyWriter) { 297 int lastGoodStreamId; 298 synchronized (this) { 299 if (shutdown) { 300 return; 301 } 302 shutdown = true; 303 lastGoodStreamId = this.lastGoodStreamId; 304 } 305 spdyWriter.goAway(0, lastGoodStreamId, statusCode); 306 } 307 } 308 309 /** 310 * Closes this connection. This cancels all open streams and unanswered 311 * pings. It closes the underlying input and output streams and shuts down 312 * internal executor services. 313 */ 314 @Override public void close() throws IOException { 315 close(GOAWAY_OK, SpdyStream.RST_CANCEL); 316 } 317 318 private void close(int shutdownStatusCode, int rstStatusCode) throws IOException { 319 assert (!Thread.holdsLock(this)); 320 IOException thrown = null; 321 try { 322 shutdown(shutdownStatusCode); 323 } catch (IOException e) { 324 thrown = e; 325 } 326 327 SpdyStream[] streamsToClose = null; 328 Ping[] pingsToCancel = null; 329 synchronized (this) { 330 if (!streams.isEmpty()) { 331 streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); 332 streams.clear(); 333 setIdle(false); 334 } 335 if (pings != null) { 336 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 337 pings = null; 338 } 339 } 340 341 if (streamsToClose != null) { 342 for (SpdyStream stream : streamsToClose) { 343 try { 344 stream.close(rstStatusCode); 345 } catch (IOException e) { 346 if (thrown != null) thrown = e; 347 } 348 } 349 } 350 351 if (pingsToCancel != null) { 352 for (Ping ping : pingsToCancel) { 353 ping.cancel(); 354 } 355 } 356 357 try { 358 spdyReader.close(); 359 } catch (IOException e) { 360 thrown = e; 361 } 362 try { 363 spdyWriter.close(); 364 } catch (IOException e) { 365 if (thrown == null) thrown = e; 366 } 367 368 if (thrown != null) throw thrown; 369 } 370 371 public static class Builder { 372 private String hostName; 373 private InputStream in; 374 private OutputStream out; 375 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 376 public boolean client; 377 378 public Builder(boolean client, Socket socket) throws IOException { 379 this("", client, socket.getInputStream(), socket.getOutputStream()); 380 } 381 382 public Builder(boolean client, InputStream in, OutputStream out) { 383 this("", client, in, out); 384 } 385 386 /** 387 * @param client true if this peer initiated the connection; false if 388 * this peer accepted the connection. 389 */ 390 public Builder(String hostName, boolean client, Socket socket) throws IOException { 391 this(hostName, client, socket.getInputStream(), socket.getOutputStream()); 392 } 393 394 /** 395 * @param client true if this peer initiated the connection; false if this 396 * peer accepted the connection. 397 */ 398 public Builder(String hostName, boolean client, InputStream in, OutputStream out) { 399 this.hostName = hostName; 400 this.client = client; 401 this.in = in; 402 this.out = out; 403 } 404 405 public Builder handler(IncomingStreamHandler handler) { 406 this.handler = handler; 407 return this; 408 } 409 410 public SpdyConnection build() { 411 return new SpdyConnection(this); 412 } 413 } 414 415 private class Reader implements Runnable, SpdyReader.Handler { 416 @Override public void run() { 417 int shutdownStatusCode = GOAWAY_INTERNAL_ERROR; 418 int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR; 419 try { 420 while (spdyReader.nextFrame(this)) { 421 } 422 shutdownStatusCode = GOAWAY_OK; 423 rstStatusCode = SpdyStream.RST_CANCEL; 424 } catch (IOException e) { 425 shutdownStatusCode = GOAWAY_PROTOCOL_ERROR; 426 rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR; 427 } finally { 428 try { 429 close(shutdownStatusCode, rstStatusCode); 430 } catch (IOException ignored) { 431 } 432 } 433 } 434 435 @Override public void data(int flags, int streamId, InputStream in, int length) 436 throws IOException { 437 SpdyStream dataStream = getStream(streamId); 438 if (dataStream == null) { 439 writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); 440 Util.skipByReading(in, length); 441 return; 442 } 443 dataStream.receiveData(in, length); 444 if ((flags & SpdyConnection.FLAG_FIN) != 0) { 445 dataStream.receiveFin(); 446 } 447 } 448 449 @Override public void synStream(int flags, int streamId, int associatedStreamId, int priority, 450 int slot, List<String> nameValueBlock) { 451 final SpdyStream synStream; 452 final SpdyStream previous; 453 synchronized (SpdyConnection.this) { 454 synStream = 455 new SpdyStream(streamId, SpdyConnection.this, flags, priority, slot, nameValueBlock, 456 settings); 457 if (shutdown) { 458 return; 459 } 460 lastGoodStreamId = streamId; 461 previous = streams.put(streamId, synStream); 462 } 463 if (previous != null) { 464 previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR); 465 removeStream(streamId); 466 return; 467 } 468 469 executor.submit(new NamedRunnable("OkHttp SPDY Callback %s stream %d", hostName, streamId) { 470 @Override public void execute() { 471 try { 472 handler.receive(synStream); 473 } catch (IOException e) { 474 throw new RuntimeException(e); 475 } 476 } 477 }); 478 } 479 480 @Override public void synReply(int flags, int streamId, List<String> nameValueBlock) 481 throws IOException { 482 SpdyStream replyStream = getStream(streamId); 483 if (replyStream == null) { 484 writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); 485 return; 486 } 487 replyStream.receiveReply(nameValueBlock); 488 if ((flags & SpdyConnection.FLAG_FIN) != 0) { 489 replyStream.receiveFin(); 490 } 491 } 492 493 @Override public void headers(int flags, int streamId, List<String> nameValueBlock) 494 throws IOException { 495 SpdyStream replyStream = getStream(streamId); 496 if (replyStream != null) { 497 replyStream.receiveHeaders(nameValueBlock); 498 } 499 } 500 501 @Override public void rstStream(int flags, int streamId, int statusCode) { 502 SpdyStream rstStream = removeStream(streamId); 503 if (rstStream != null) { 504 rstStream.receiveRstStream(statusCode); 505 } 506 } 507 508 @Override public void settings(int flags, Settings newSettings) { 509 SpdyStream[] streamsToNotify = null; 510 synchronized (SpdyConnection.this) { 511 if (settings == null || (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) { 512 settings = newSettings; 513 } else { 514 settings.merge(newSettings); 515 } 516 if (!streams.isEmpty()) { 517 streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); 518 } 519 } 520 if (streamsToNotify != null) { 521 for (SpdyStream stream : streamsToNotify) { 522 // The synchronization here is ugly. We need to synchronize on 'this' to guard 523 // reads to 'settings'. We synchronize on 'stream' to guard the state change. 524 // And we need to acquire the 'stream' lock first, since that may block. 525 synchronized (stream) { 526 synchronized (SpdyConnection.this) { 527 stream.receiveSettings(settings); 528 } 529 } 530 } 531 } 532 } 533 534 @Override public void noop() { 535 } 536 537 @Override public void ping(int flags, int streamId) { 538 if (client != (streamId % 2 == 1)) { 539 // Respond to a client ping if this is a server and vice versa. 540 writePingLater(streamId, null); 541 } else { 542 Ping ping = removePing(streamId); 543 if (ping != null) { 544 ping.receive(); 545 } 546 } 547 } 548 549 @Override public void goAway(int flags, int lastGoodStreamId, int statusCode) { 550 synchronized (SpdyConnection.this) { 551 shutdown = true; 552 553 // Fail all streams created after the last good stream ID. 554 for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator(); 555 i.hasNext(); ) { 556 Map.Entry<Integer, SpdyStream> entry = i.next(); 557 int streamId = entry.getKey(); 558 if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { 559 entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM); 560 i.remove(); 561 } 562 } 563 } 564 } 565 566 @Override public void windowUpdate(int flags, int streamId, int deltaWindowSize) { 567 SpdyStream stream = getStream(streamId); 568 if (stream != null) { 569 stream.receiveWindowUpdate(deltaWindowSize); 570 } 571 } 572 } 573 } 574