1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.spdy; 17 18 import com.squareup.okhttp.Protocol; 19 import com.squareup.okhttp.internal.NamedRunnable; 20 import com.squareup.okhttp.internal.Util; 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.io.InterruptedIOException; 24 import java.net.InetSocketAddress; 25 import java.net.Socket; 26 import java.util.HashMap; 27 import java.util.LinkedHashSet; 28 import java.util.List; 29 import java.util.Map; 30 import java.util.Set; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.ThreadPoolExecutor; 35 import java.util.concurrent.TimeUnit; 36 import okio.Buffer; 37 import okio.BufferedSource; 38 import okio.ByteString; 39 import okio.Okio; 40 41 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 42 43 /** 44 * A socket connection to a remote peer. A connection hosts streams which can 45 * send and receive data. 46 * 47 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 48 * completed before the method returns. This is typical for Java but atypical 49 * for SPDY. This is motivated by exception transparency: an IOException that 50 * was triggered by a certain caller can be caught and handled by that caller. 51 */ 52 public final class SpdyConnection implements Closeable { 53 54 // Internal state of this connection is guarded by 'this'. No blocking 55 // operations may be performed while holding this lock! 56 // 57 // Socket writes are guarded by frameWriter. 58 // 59 // Socket reads are unguarded but are only made by the reader thread. 60 // 61 // Certain operations (like SYN_STREAM) need to synchronize on both the 62 // frameWriter (to do blocking I/O) and this (to create streams). Such 63 // operations must synchronize on 'this' last. This ensures that we never 64 // wait for a blocking operation while holding 'this'. 65 66 private static final ExecutorService executor = new ThreadPoolExecutor(0, 67 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 68 Util.threadFactory("OkHttp SpdyConnection", true)); 69 70 /** The protocol variant, like {@link com.squareup.okhttp.internal.spdy.Spdy3}. */ 71 final Protocol protocol; 72 73 /** True if this peer initiated the connection. */ 74 final boolean client; 75 76 /** 77 * User code to run in response to an incoming stream. Callbacks must not be 78 * run on the callback executor. 79 */ 80 private final IncomingStreamHandler handler; 81 private final Map<Integer, SpdyStream> streams = new HashMap<>(); 82 private final String hostName; 83 private int lastGoodStreamId; 84 private int nextStreamId; 85 private boolean shutdown; 86 private long idleStartTimeNs = System.nanoTime(); 87 88 /** Ensures push promise callbacks events are sent in order per stream. */ 89 private final ExecutorService pushExecutor; 90 91 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 92 private Map<Integer, Ping> pings; 93 /** User code to run in response to push promise events. */ 94 private final PushObserver pushObserver; 95 private int nextPingId; 96 97 /** 98 * The total number of bytes consumed by the application, but not yet 99 * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection. 100 */ 101 // Visible for testing 102 long unacknowledgedBytesRead = 0; 103 104 /** 105 * Count of bytes that can be written on the connection before receiving a 106 * window update. 107 */ 108 // Visible for testing 109 long bytesLeftInWriteWindow; 110 111 /** Settings we communicate to the peer. */ 112 // TODO: Do we want to dynamically adjust settings, or KISS and only set once? 113 final Settings okHttpSettings = new Settings(); 114 // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); 115 private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024; 116 117 /** Settings we receive from the peer. */ 118 // TODO: MWS will need to guard on this setting before attempting to push. 119 final Settings peerSettings = new Settings(); 120 121 private boolean receivedInitialPeerSettings = false; 122 final Variant variant; 123 final Socket socket; 124 final FrameWriter frameWriter; 125 126 // Visible for testing 127 final Reader readerRunnable; 128 129 private SpdyConnection(Builder builder) throws IOException { 130 protocol = builder.protocol; 131 pushObserver = builder.pushObserver; 132 client = builder.client; 133 handler = builder.handler; 134 // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1 135 nextStreamId = builder.client ? 1 : 2; 136 if (builder.client && protocol == Protocol.HTTP_2) { 137 nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade. 138 } 139 140 nextPingId = builder.client ? 1 : 2; 141 142 // Flow control was designed more for servers, or proxies than edge clients. 143 // If we are a client, set the flow control window to 16MiB. This avoids 144 // thrashing window updates every 64KiB, yet small enough to avoid blowing 145 // up the heap. 146 if (builder.client) { 147 okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE); 148 } 149 150 hostName = builder.hostName; 151 152 if (protocol == Protocol.HTTP_2) { 153 variant = new Http2(); 154 // Like newSingleThreadExecutor, except lazy creates the thread. 155 pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, 156 new LinkedBlockingQueue<Runnable>(), 157 Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true)); 158 // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2 159 peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535); 160 peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE); 161 } else if (protocol == Protocol.SPDY_3) { 162 variant = new Spdy3(); 163 pushExecutor = null; 164 } else { 165 throw new AssertionError(protocol); 166 } 167 bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 168 socket = builder.socket; 169 frameWriter = variant.newWriter(Okio.buffer(Okio.sink(builder.socket)), client); 170 171 readerRunnable = new Reader(); 172 new Thread(readerRunnable).start(); // Not a daemon thread. 173 } 174 175 /** The protocol as selected using ALPN. */ 176 public Protocol getProtocol() { 177 return protocol; 178 } 179 180 /** 181 * Returns the number of {@link SpdyStream#isOpen() open streams} on this 182 * connection. 183 */ 184 public synchronized int openStreamCount() { 185 return streams.size(); 186 } 187 188 synchronized SpdyStream getStream(int id) { 189 return streams.get(id); 190 } 191 192 synchronized SpdyStream removeStream(int streamId) { 193 SpdyStream stream = streams.remove(streamId); 194 if (stream != null && streams.isEmpty()) { 195 setIdle(true); 196 } 197 notifyAll(); // The removed stream may be blocked on a connection-wide window update. 198 return stream; 199 } 200 201 private synchronized void setIdle(boolean value) { 202 idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE; 203 } 204 205 /** Returns true if this connection is idle. */ 206 public synchronized boolean isIdle() { 207 return idleStartTimeNs != Long.MAX_VALUE; 208 } 209 210 /** 211 * Returns the time in ns when this connection became idle or Long.MAX_VALUE 212 * if connection is not idle. 213 */ 214 public synchronized long getIdleStartTimeNs() { 215 return idleStartTimeNs; 216 } 217 218 /** 219 * Returns a new server-initiated stream. 220 * 221 * @param associatedStreamId the stream that triggered the sender to create 222 * this stream. 223 * @param out true to create an output stream that we can use to send data 224 * to the remote peer. Corresponds to {@code FLAG_FIN}. 225 */ 226 public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out) 227 throws IOException { 228 if (client) throw new IllegalStateException("Client cannot push requests."); 229 if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2"); 230 return newStream(associatedStreamId, requestHeaders, out, false); 231 } 232 233 /** 234 * Returns a new locally-initiated stream. 235 * 236 * @param out true to create an output stream that we can use to send data to the remote peer. 237 * Corresponds to {@code FLAG_FIN}. 238 * @param in true to create an input stream that the remote peer can use to send data to us. 239 * Corresponds to {@code FLAG_UNIDIRECTIONAL}. 240 */ 241 public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in) 242 throws IOException { 243 return newStream(0, requestHeaders, out, in); 244 } 245 246 private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, 247 boolean in) throws IOException { 248 boolean outFinished = !out; 249 boolean inFinished = !in; 250 SpdyStream stream; 251 int streamId; 252 253 synchronized (frameWriter) { 254 synchronized (this) { 255 if (shutdown) { 256 throw new IOException("shutdown"); 257 } 258 streamId = nextStreamId; 259 nextStreamId += 2; 260 stream = new SpdyStream(streamId, this, outFinished, inFinished, requestHeaders); 261 if (stream.isOpen()) { 262 streams.put(streamId, stream); 263 setIdle(false); 264 } 265 } 266 if (associatedStreamId == 0) { 267 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, 268 requestHeaders); 269 } else if (client) { 270 throw new IllegalArgumentException("client streams shouldn't have associated stream IDs"); 271 } else { // HTTP/2 has a PUSH_PROMISE frame. 272 frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); 273 } 274 } 275 276 if (!out) { 277 frameWriter.flush(); 278 } 279 280 return stream; 281 } 282 283 void writeSynReply(int streamId, boolean outFinished, List<Header> alternating) 284 throws IOException { 285 frameWriter.synReply(outFinished, streamId, alternating); 286 } 287 288 /** 289 * Callers of this method are not thread safe, and sometimes on application threads. Most often, 290 * this method will be called to send a buffer worth of data to the peer. 291 * 292 * <p>Writes are subject to the write window of the stream and the connection. Until there is a 293 * window sufficient to send {@code byteCount}, the caller will block. For example, a user of 294 * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's 295 * write window will block. 296 * 297 * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only 298 * use case for zero {@code byteCount} is closing a flushed output stream. 299 */ 300 public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount) 301 throws IOException { 302 if (byteCount == 0) { // Empty data frames are not flow-controlled. 303 frameWriter.data(outFinished, streamId, buffer, 0); 304 return; 305 } 306 307 while (byteCount > 0) { 308 int toWrite; 309 synchronized (SpdyConnection.this) { 310 try { 311 while (bytesLeftInWriteWindow <= 0) { 312 // Before blocking, confirm that the stream we're writing is still open. It's possible 313 // that the stream has since been closed (such as if this write timed out.) 314 if (!streams.containsKey(streamId)) { 315 throw new IOException("stream closed"); 316 } 317 SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE. 318 } 319 } catch (InterruptedException e) { 320 throw new InterruptedIOException(); 321 } 322 323 toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow); 324 toWrite = Math.min(toWrite, frameWriter.maxDataLength()); 325 bytesLeftInWriteWindow -= toWrite; 326 } 327 328 byteCount -= toWrite; 329 frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite); 330 } 331 } 332 333 /** 334 * {@code delta} will be negative if a settings frame initial window is 335 * smaller than the last. 336 */ 337 void addBytesToWriteWindow(long delta) { 338 bytesLeftInWriteWindow += delta; 339 if (delta > 0) SpdyConnection.this.notifyAll(); 340 } 341 342 void writeSynResetLater(final int streamId, final ErrorCode errorCode) { 343 executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 344 @Override public void execute() { 345 try { 346 writeSynReset(streamId, errorCode); 347 } catch (IOException ignored) { 348 } 349 } 350 }); 351 } 352 353 void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { 354 frameWriter.rstStream(streamId, statusCode); 355 } 356 357 void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) { 358 executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) { 359 @Override public void execute() { 360 try { 361 frameWriter.windowUpdate(streamId, unacknowledgedBytesRead); 362 } catch (IOException ignored) { 363 } 364 } 365 }); 366 } 367 368 /** 369 * Sends a ping frame to the peer. Use the returned object to await the 370 * ping's response and observe its round trip time. 371 */ 372 public Ping ping() throws IOException { 373 Ping ping = new Ping(); 374 int pingId; 375 synchronized (this) { 376 if (shutdown) { 377 throw new IOException("shutdown"); 378 } 379 pingId = nextPingId; 380 nextPingId += 2; 381 if (pings == null) pings = new HashMap<>(); 382 pings.put(pingId, ping); 383 } 384 writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping); 385 return ping; 386 } 387 388 private void writePingLater( 389 final boolean reply, final int payload1, final int payload2, final Ping ping) { 390 executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x", 391 hostName, payload1, payload2) { 392 @Override public void execute() { 393 try { 394 writePing(reply, payload1, payload2, ping); 395 } catch (IOException ignored) { 396 } 397 } 398 }); 399 } 400 401 private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException { 402 synchronized (frameWriter) { 403 // Observe the sent time immediately before performing I/O. 404 if (ping != null) ping.send(); 405 frameWriter.ping(reply, payload1, payload2); 406 } 407 } 408 409 private synchronized Ping removePing(int id) { 410 return pings != null ? pings.remove(id) : null; 411 } 412 413 public void flush() throws IOException { 414 frameWriter.flush(); 415 } 416 417 /** 418 * Degrades this connection such that new streams can neither be created 419 * locally, nor accepted from the remote peer. Existing streams are not 420 * impacted. This is intended to permit an endpoint to gracefully stop 421 * accepting new requests without harming previously established streams. 422 */ 423 public void shutdown(ErrorCode statusCode) throws IOException { 424 synchronized (frameWriter) { 425 int lastGoodStreamId; 426 synchronized (this) { 427 if (shutdown) { 428 return; 429 } 430 shutdown = true; 431 lastGoodStreamId = this.lastGoodStreamId; 432 } 433 // TODO: propagate exception message into debugData 434 frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY); 435 } 436 } 437 438 /** 439 * Closes this connection. This cancels all open streams and unanswered 440 * pings. It closes the underlying input and output streams and shuts down 441 * internal executor services. 442 */ 443 @Override public void close() throws IOException { 444 close(ErrorCode.NO_ERROR, ErrorCode.CANCEL); 445 } 446 447 private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException { 448 assert (!Thread.holdsLock(this)); 449 IOException thrown = null; 450 try { 451 shutdown(connectionCode); 452 } catch (IOException e) { 453 thrown = e; 454 } 455 456 SpdyStream[] streamsToClose = null; 457 Ping[] pingsToCancel = null; 458 synchronized (this) { 459 if (!streams.isEmpty()) { 460 streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); 461 streams.clear(); 462 setIdle(false); 463 } 464 if (pings != null) { 465 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 466 pings = null; 467 } 468 } 469 470 if (streamsToClose != null) { 471 for (SpdyStream stream : streamsToClose) { 472 try { 473 stream.close(streamCode); 474 } catch (IOException e) { 475 if (thrown != null) thrown = e; 476 } 477 } 478 } 479 480 if (pingsToCancel != null) { 481 for (Ping ping : pingsToCancel) { 482 ping.cancel(); 483 } 484 } 485 486 // Close the writer to release its resources (such as deflaters). 487 try { 488 frameWriter.close(); 489 } catch (IOException e) { 490 if (thrown == null) thrown = e; 491 } 492 493 // Close the socket to break out the reader thread, which will clean up after itself. 494 try { 495 socket.close(); 496 } catch (IOException e) { 497 thrown = e; 498 } 499 500 if (thrown != null) throw thrown; 501 } 502 503 /** 504 * Sends a connection header if the current variant requires it. This should 505 * be called after {@link Builder#build} for all new connections. 506 */ 507 public void sendConnectionPreface() throws IOException { 508 frameWriter.connectionPreface(); 509 frameWriter.settings(okHttpSettings); 510 int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE); 511 if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) { 512 frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE); 513 } 514 } 515 516 public static class Builder { 517 private String hostName; 518 private Socket socket; 519 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 520 private Protocol protocol = Protocol.SPDY_3; 521 private PushObserver pushObserver = PushObserver.CANCEL; 522 private boolean client; 523 524 public Builder(boolean client, Socket socket) throws IOException { 525 this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket); 526 } 527 528 /** 529 * @param client true if this peer initiated the connection; false if this 530 * peer accepted the connection. 531 */ 532 public Builder(String hostName, boolean client, Socket socket) throws IOException { 533 this.hostName = hostName; 534 this.client = client; 535 this.socket = socket; 536 } 537 538 public Builder handler(IncomingStreamHandler handler) { 539 this.handler = handler; 540 return this; 541 } 542 543 public Builder protocol(Protocol protocol) { 544 this.protocol = protocol; 545 return this; 546 } 547 548 public Builder pushObserver(PushObserver pushObserver) { 549 this.pushObserver = pushObserver; 550 return this; 551 } 552 553 public SpdyConnection build() throws IOException { 554 return new SpdyConnection(this); 555 } 556 } 557 558 /** 559 * Methods in this class must not lock FrameWriter. If a method needs to 560 * write a frame, create an async task to do so. 561 */ 562 class Reader extends NamedRunnable implements FrameReader.Handler { 563 FrameReader frameReader; 564 565 private Reader() { 566 super("OkHttp %s", hostName); 567 } 568 569 @Override protected void execute() { 570 ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR; 571 ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR; 572 try { 573 frameReader = variant.newReader(Okio.buffer(Okio.source(socket)), client); 574 if (!client) { 575 frameReader.readConnectionPreface(); 576 } 577 while (frameReader.nextFrame(this)) { 578 } 579 connectionErrorCode = ErrorCode.NO_ERROR; 580 streamErrorCode = ErrorCode.CANCEL; 581 } catch (RuntimeException | IOException e) { 582 connectionErrorCode = ErrorCode.PROTOCOL_ERROR; 583 streamErrorCode = ErrorCode.PROTOCOL_ERROR; 584 } finally { 585 try { 586 close(connectionErrorCode, streamErrorCode); 587 } catch (IOException ignored) { 588 } 589 Util.closeQuietly(frameReader); 590 } 591 } 592 593 @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) 594 throws IOException { 595 if (pushedStream(streamId)) { 596 pushDataLater(streamId, source, length, inFinished); 597 return; 598 } 599 SpdyStream dataStream = getStream(streamId); 600 if (dataStream == null) { 601 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 602 source.skip(length); 603 return; 604 } 605 dataStream.receiveData(source, length); 606 if (inFinished) { 607 dataStream.receiveFin(); 608 } 609 } 610 611 @Override public void headers(boolean outFinished, boolean inFinished, int streamId, 612 int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) { 613 if (pushedStream(streamId)) { 614 pushHeadersLater(streamId, headerBlock, inFinished); 615 return; 616 } 617 SpdyStream stream; 618 synchronized (SpdyConnection.this) { 619 // If we're shutdown, don't bother with this stream. 620 if (shutdown) return; 621 622 stream = getStream(streamId); 623 624 if (stream == null) { 625 // The headers claim to be for an existing stream, but we don't have one. 626 if (headersMode.failIfStreamAbsent()) { 627 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 628 return; 629 } 630 631 // If the stream ID is less than the last created ID, assume it's already closed. 632 if (streamId <= lastGoodStreamId) return; 633 634 // If the stream ID is in the client's namespace, assume it's already closed. 635 if (streamId % 2 == nextStreamId % 2) return; 636 637 // Create a stream. 638 final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished, 639 inFinished, headerBlock); 640 lastGoodStreamId = streamId; 641 streams.put(streamId, newStream); 642 executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 643 @Override public void execute() { 644 try { 645 handler.receive(newStream); 646 } catch (RuntimeException | IOException e) { 647 try { 648 newStream.close(ErrorCode.PROTOCOL_ERROR); 649 } catch (IOException ignored) { 650 } 651 } 652 } 653 }); 654 return; 655 } 656 } 657 658 // The headers claim to be for a new stream, but we already have one. 659 if (headersMode.failIfStreamPresent()) { 660 stream.closeLater(ErrorCode.PROTOCOL_ERROR); 661 removeStream(streamId); 662 return; 663 } 664 665 // Update an existing stream. 666 stream.receiveHeaders(headerBlock, headersMode); 667 if (inFinished) stream.receiveFin(); 668 } 669 670 @Override public void rstStream(int streamId, ErrorCode errorCode) { 671 if (pushedStream(streamId)) { 672 pushResetLater(streamId, errorCode); 673 return; 674 } 675 SpdyStream rstStream = removeStream(streamId); 676 if (rstStream != null) { 677 rstStream.receiveRstStream(errorCode); 678 } 679 } 680 681 @Override public void settings(boolean clearPrevious, Settings newSettings) { 682 long delta = 0; 683 SpdyStream[] streamsToNotify = null; 684 synchronized (SpdyConnection.this) { 685 int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 686 if (clearPrevious) peerSettings.clear(); 687 peerSettings.merge(newSettings); 688 if (getProtocol() == Protocol.HTTP_2) { 689 ackSettingsLater(newSettings); 690 } 691 int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 692 if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { 693 delta = peerInitialWindowSize - priorWriteWindowSize; 694 if (!receivedInitialPeerSettings) { 695 addBytesToWriteWindow(delta); 696 receivedInitialPeerSettings = true; 697 } 698 if (!streams.isEmpty()) { 699 streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); 700 } 701 } 702 } 703 if (streamsToNotify != null && delta != 0) { 704 for (SpdyStream stream : streamsToNotify) { 705 synchronized (stream) { 706 stream.addBytesToWriteWindow(delta); 707 } 708 } 709 } 710 } 711 712 private void ackSettingsLater(final Settings peerSettings) { 713 executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) { 714 @Override public void execute() { 715 try { 716 frameWriter.ackSettings(peerSettings); 717 } catch (IOException ignored) { 718 } 719 } 720 }); 721 } 722 723 @Override public void ackSettings() { 724 // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT. 725 } 726 727 @Override public void ping(boolean reply, int payload1, int payload2) { 728 if (reply) { 729 Ping ping = removePing(payload1); 730 if (ping != null) { 731 ping.receive(); 732 } 733 } else { 734 // Send a reply to a client ping if this is a server and vice versa. 735 writePingLater(true, payload1, payload2, null); 736 } 737 } 738 739 @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 740 if (debugData.size() > 0) { // TODO: log the debugData 741 } 742 743 // Copy the streams first. We don't want to hold a lock when we call receiveRstStream(). 744 SpdyStream[] streamsCopy; 745 synchronized (SpdyConnection.this) { 746 streamsCopy = streams.values().toArray(new SpdyStream[streams.size()]); 747 shutdown = true; 748 } 749 750 // Fail all streams created after the last good stream ID. 751 for (SpdyStream spdyStream : streamsCopy) { 752 if (spdyStream.getId() > lastGoodStreamId && spdyStream.isLocallyInitiated()) { 753 spdyStream.receiveRstStream(ErrorCode.REFUSED_STREAM); 754 removeStream(spdyStream.getId()); 755 } 756 } 757 } 758 759 @Override public void windowUpdate(int streamId, long windowSizeIncrement) { 760 if (streamId == 0) { 761 synchronized (SpdyConnection.this) { 762 bytesLeftInWriteWindow += windowSizeIncrement; 763 SpdyConnection.this.notifyAll(); 764 } 765 } else { 766 SpdyStream stream = getStream(streamId); 767 if (stream != null) { 768 synchronized (stream) { 769 stream.addBytesToWriteWindow(windowSizeIncrement); 770 } 771 } 772 } 773 } 774 775 @Override public void priority(int streamId, int streamDependency, int weight, 776 boolean exclusive) { 777 // TODO: honor priority. 778 } 779 780 @Override 781 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) { 782 pushRequestLater(promisedStreamId, requestHeaders); 783 } 784 785 @Override public void alternateService(int streamId, String origin, ByteString protocol, 786 String host, int port, long maxAge) { 787 // TODO: register alternate service. 788 } 789 } 790 791 /** Even, positive numbered streams are pushed streams in HTTP/2. */ 792 private boolean pushedStream(int streamId) { 793 return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; 794 } 795 796 // Guarded by this. 797 private final Set<Integer> currentPushRequests = new LinkedHashSet<>(); 798 799 private void pushRequestLater(final int streamId, final List<Header> requestHeaders) { 800 synchronized (this) { 801 if (currentPushRequests.contains(streamId)) { 802 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR); 803 return; 804 } 805 currentPushRequests.add(streamId); 806 } 807 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) { 808 @Override public void execute() { 809 boolean cancel = pushObserver.onRequest(streamId, requestHeaders); 810 try { 811 if (cancel) { 812 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 813 synchronized (SpdyConnection.this) { 814 currentPushRequests.remove(streamId); 815 } 816 } 817 } catch (IOException ignored) { 818 } 819 } 820 }); 821 } 822 823 private void pushHeadersLater(final int streamId, final List<Header> requestHeaders, 824 final boolean inFinished) { 825 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) { 826 @Override public void execute() { 827 boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished); 828 try { 829 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 830 if (cancel || inFinished) { 831 synchronized (SpdyConnection.this) { 832 currentPushRequests.remove(streamId); 833 } 834 } 835 } catch (IOException ignored) { 836 } 837 } 838 }); 839 } 840 841 /** 842 * Eagerly reads {@code byteCount} bytes from the source before launching a background task to 843 * process the data. This avoids corrupting the stream. 844 */ 845 private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount, 846 final boolean inFinished) throws IOException { 847 final Buffer buffer = new Buffer(); 848 source.require(byteCount); // Eagerly read the frame before firing client thread. 849 source.read(buffer, byteCount); 850 if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount); 851 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) { 852 @Override public void execute() { 853 try { 854 boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished); 855 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 856 if (cancel || inFinished) { 857 synchronized (SpdyConnection.this) { 858 currentPushRequests.remove(streamId); 859 } 860 } 861 } catch (IOException ignored) { 862 } 863 } 864 }); 865 } 866 867 private void pushResetLater(final int streamId, final ErrorCode errorCode) { 868 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) { 869 @Override public void execute() { 870 pushObserver.onReset(streamId, errorCode); 871 synchronized (SpdyConnection.this) { 872 currentPushRequests.remove(streamId); 873 } 874 } 875 }); 876 } 877 } 878