1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.spdy; 17 18 import com.squareup.okhttp.Protocol; 19 import com.squareup.okhttp.internal.NamedRunnable; 20 import com.squareup.okhttp.internal.Util; 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.io.InterruptedIOException; 24 import java.net.InetSocketAddress; 25 import java.net.Socket; 26 import java.util.HashMap; 27 import java.util.Iterator; 28 import java.util.LinkedHashSet; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Set; 32 import java.util.concurrent.ExecutorService; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.ThreadPoolExecutor; 35 import java.util.concurrent.TimeUnit; 36 import okio.BufferedSink; 37 import okio.BufferedSource; 38 import okio.ByteString; 39 import okio.OkBuffer; 40 import okio.Okio; 41 42 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 43 44 /** 45 * A socket connection to a remote peer. A connection hosts streams which can 46 * send and receive data. 47 * 48 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 49 * completed before the method returns. This is typical for Java but atypical 50 * for SPDY. This is motivated by exception transparency: an IOException that 51 * was triggered by a certain caller can be caught and handled by that caller. 52 */ 53 public final class SpdyConnection implements Closeable { 54 55 // Internal state of this connection is guarded by 'this'. No blocking 56 // operations may be performed while holding this lock! 57 // 58 // Socket writes are guarded by frameWriter. 59 // 60 // Socket reads are unguarded but are only made by the reader thread. 61 // 62 // Certain operations (like SYN_STREAM) need to synchronize on both the 63 // frameWriter (to do blocking I/O) and this (to create streams). Such 64 // operations must synchronize on 'this' last. This ensures that we never 65 // wait for a blocking operation while holding 'this'. 66 67 private static final ExecutorService executor = new ThreadPoolExecutor(0, 68 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 69 Util.threadFactory("OkHttp SpdyConnection", true)); 70 71 /** The protocol variant, like {@link com.squareup.okhttp.internal.spdy.Spdy3}. */ 72 final Protocol protocol; 73 74 /** True if this peer initiated the connection. */ 75 final boolean client; 76 77 /** 78 * User code to run in response to an incoming stream. Callbacks must not be 79 * run on the callback executor. 80 */ 81 private final IncomingStreamHandler handler; 82 private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>(); 83 private final String hostName; 84 private int lastGoodStreamId; 85 private int nextStreamId; 86 private boolean shutdown; 87 private long idleStartTimeNs = System.nanoTime(); 88 89 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 90 private Map<Integer, Ping> pings; 91 /** User code to run in response to push promise events. */ 92 private final PushObserver pushObserver; 93 private int nextPingId; 94 95 /** 96 * The total number of bytes consumed by the application, but not yet 97 * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection. 98 */ 99 // Visible for testing 100 long unacknowledgedBytesRead = 0; 101 102 /** 103 * Count of bytes that can be written on the connection before receiving a 104 * window update. 105 */ 106 // Visible for testing 107 long bytesLeftInWriteWindow; 108 109 /** Settings we communicate to the peer. */ 110 // TODO: Do we want to dynamically adjust settings, or KISS and only set once? 111 final Settings okHttpSettings = new Settings(); 112 // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); 113 114 /** Settings we receive from the peer. */ 115 // TODO: MWS will need to guard on this setting before attempting to push. 116 final Settings peerSettings = new Settings(); 117 118 private boolean receivedInitialPeerSettings = false; 119 final FrameReader frameReader; 120 final FrameWriter frameWriter; 121 final long maxFrameSize; 122 123 // Visible for testing 124 final Reader readerRunnable; 125 126 private SpdyConnection(Builder builder) { 127 protocol = builder.protocol; 128 pushObserver = builder.pushObserver; 129 client = builder.client; 130 handler = builder.handler; 131 nextStreamId = builder.client ? 1 : 2; 132 nextPingId = builder.client ? 1 : 2; 133 134 // Flow control was designed more for servers, or proxies than edge clients. 135 // If we are a client, set the flow control window to 16MiB. This avoids 136 // thrashing window updates every 64KiB, yet small enough to avoid blowing 137 // up the heap. 138 if (builder.client) { 139 okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 16 * 1024 * 1024); 140 } 141 142 hostName = builder.hostName; 143 144 Variant variant; 145 if (protocol == Protocol.HTTP_2) { 146 variant = new Http20Draft09(); 147 } else if (protocol == Protocol.SPDY_3) { 148 variant = new Spdy3(); 149 } else { 150 throw new AssertionError(protocol); 151 } 152 bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 153 frameReader = variant.newReader(builder.source, client); 154 frameWriter = variant.newWriter(builder.sink, client); 155 maxFrameSize = variant.maxFrameSize(); 156 157 readerRunnable = new Reader(); 158 new Thread(readerRunnable).start(); // Not a daemon thread. 159 } 160 161 /** The protocol as selected using NPN or ALPN. */ 162 public Protocol getProtocol() { 163 return protocol; 164 } 165 166 /** 167 * Returns the number of {@link SpdyStream#isOpen() open streams} on this 168 * connection. 169 */ 170 public synchronized int openStreamCount() { 171 return streams.size(); 172 } 173 174 synchronized SpdyStream getStream(int id) { 175 return streams.get(id); 176 } 177 178 synchronized SpdyStream removeStream(int streamId) { 179 SpdyStream stream = streams.remove(streamId); 180 if (stream != null && streams.isEmpty()) { 181 setIdle(true); 182 } 183 return stream; 184 } 185 186 private synchronized void setIdle(boolean value) { 187 idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE; 188 } 189 190 /** Returns true if this connection is idle. */ 191 public synchronized boolean isIdle() { 192 return idleStartTimeNs != Long.MAX_VALUE; 193 } 194 195 /** 196 * Returns the time in ns when this connection became idle or Long.MAX_VALUE 197 * if connection is not idle. 198 */ 199 public synchronized long getIdleStartTimeNs() { 200 return idleStartTimeNs; 201 } 202 203 /** 204 * Returns a new server-initiated stream. 205 * 206 * @param associatedStreamId the stream that triggered the sender to create 207 * this stream. 208 * @param out true to create an output stream that we can use to send data 209 * to the remote peer. Corresponds to {@code FLAG_FIN}. 210 */ 211 public SpdyStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out) 212 throws IOException { 213 if (client) throw new IllegalStateException("Client cannot push requests."); 214 if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2"); 215 return newStream(associatedStreamId, requestHeaders, out, false); 216 } 217 218 /** 219 * Returns a new locally-initiated stream. 220 * 221 * @param out true to create an output stream that we can use to send data to the remote peer. 222 * Corresponds to {@code FLAG_FIN}. 223 * @param in true to create an input stream that the remote peer can use to send data to us. 224 * Corresponds to {@code FLAG_UNIDIRECTIONAL}. 225 */ 226 public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in) 227 throws IOException { 228 return newStream(0, requestHeaders, out, in); 229 } 230 231 private SpdyStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, 232 boolean in) throws IOException { 233 boolean outFinished = !out; 234 boolean inFinished = !in; 235 int priority = -1; // TODO: permit the caller to specify a priority? 236 int slot = 0; // TODO: permit the caller to specify a slot? 237 SpdyStream stream; 238 int streamId; 239 240 synchronized (frameWriter) { 241 synchronized (this) { 242 if (shutdown) { 243 throw new IOException("shutdown"); 244 } 245 streamId = nextStreamId; 246 nextStreamId += 2; 247 stream = new SpdyStream(streamId, this, outFinished, inFinished, priority, requestHeaders); 248 if (stream.isOpen()) { 249 streams.put(streamId, stream); 250 setIdle(false); 251 } 252 } 253 if (associatedStreamId == 0) { 254 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, priority, slot, 255 requestHeaders); 256 } else if (client) { 257 throw new IllegalArgumentException("client streams shouldn't have associated stream IDs"); 258 } else { // HTTP/2 has a PUSH_PROMISE frame. 259 frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); 260 } 261 } 262 263 if (!out) { 264 frameWriter.flush(); 265 } 266 267 return stream; 268 } 269 270 void writeSynReply(int streamId, boolean outFinished, List<Header> alternating) 271 throws IOException { 272 frameWriter.synReply(outFinished, streamId, alternating); 273 } 274 275 /** 276 * Callers of this method are not thread safe, and sometimes on application 277 * threads. Most often, this method will be called to send a buffer worth of 278 * data to the peer. 279 * <p> 280 * Writes are subject to the write window of the stream and the connection. 281 * Until there is a window sufficient to send {@code byteCount}, the caller 282 * will block. For example, a user of {@code HttpURLConnection} who flushes 283 * more bytes to the output stream than the connection's write window will 284 * block. 285 * <p> 286 * Zero {@code byteCount} writes are not subject to flow control and 287 * will not block. The only use case for zero {@code byteCount} is closing 288 * a flushed output stream. 289 */ 290 public void writeData(int streamId, boolean outFinished, OkBuffer buffer, long byteCount) 291 throws IOException { 292 if (byteCount == 0) { // Empty data frames are not flow-controlled. 293 frameWriter.data(outFinished, streamId, buffer, 0); 294 return; 295 } 296 297 while (byteCount > 0) { 298 int toWrite; 299 synchronized (SpdyConnection.this) { 300 try { 301 while (bytesLeftInWriteWindow <= 0) { 302 SpdyConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE. 303 } 304 } catch (InterruptedException e) { 305 throw new InterruptedIOException(); 306 } 307 308 toWrite = (int) Math.min(Math.min(byteCount, bytesLeftInWriteWindow), maxFrameSize); 309 bytesLeftInWriteWindow -= toWrite; 310 } 311 312 byteCount -= toWrite; 313 frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite); 314 } 315 } 316 317 /** 318 * {@code delta} will be negative if a settings frame initial window is 319 * smaller than the last. 320 */ 321 void addBytesToWriteWindow(long delta) { 322 bytesLeftInWriteWindow += delta; 323 if (delta > 0) SpdyConnection.this.notifyAll(); 324 } 325 326 void writeSynResetLater(final int streamId, final ErrorCode errorCode) { 327 executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 328 @Override public void execute() { 329 try { 330 writeSynReset(streamId, errorCode); 331 } catch (IOException ignored) { 332 } 333 } 334 }); 335 } 336 337 void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { 338 frameWriter.rstStream(streamId, statusCode); 339 } 340 341 void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) { 342 executor.submit(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) { 343 @Override public void execute() { 344 try { 345 frameWriter.windowUpdate(streamId, unacknowledgedBytesRead); 346 } catch (IOException ignored) { 347 } 348 } 349 }); 350 } 351 352 /** 353 * Sends a ping frame to the peer. Use the returned object to await the 354 * ping's response and observe its round trip time. 355 */ 356 public Ping ping() throws IOException { 357 Ping ping = new Ping(); 358 int pingId; 359 synchronized (this) { 360 if (shutdown) { 361 throw new IOException("shutdown"); 362 } 363 pingId = nextPingId; 364 nextPingId += 2; 365 if (pings == null) pings = new HashMap<Integer, Ping>(); 366 pings.put(pingId, ping); 367 } 368 writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping); 369 return ping; 370 } 371 372 private void writePingLater( 373 final boolean reply, final int payload1, final int payload2, final Ping ping) { 374 executor.submit(new NamedRunnable("OkHttp %s ping %08x%08x", 375 hostName, payload1, payload2) { 376 @Override public void execute() { 377 try { 378 writePing(reply, payload1, payload2, ping); 379 } catch (IOException ignored) { 380 } 381 } 382 }); 383 } 384 385 private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException { 386 synchronized (frameWriter) { 387 // Observe the sent time immediately before performing I/O. 388 if (ping != null) ping.send(); 389 frameWriter.ping(reply, payload1, payload2); 390 } 391 } 392 393 private synchronized Ping removePing(int id) { 394 return pings != null ? pings.remove(id) : null; 395 } 396 397 public void flush() throws IOException { 398 frameWriter.flush(); 399 } 400 401 /** 402 * Degrades this connection such that new streams can neither be created 403 * locally, nor accepted from the remote peer. Existing streams are not 404 * impacted. This is intended to permit an endpoint to gracefully stop 405 * accepting new requests without harming previously established streams. 406 */ 407 public void shutdown(ErrorCode statusCode) throws IOException { 408 synchronized (frameWriter) { 409 int lastGoodStreamId; 410 synchronized (this) { 411 if (shutdown) { 412 return; 413 } 414 shutdown = true; 415 lastGoodStreamId = this.lastGoodStreamId; 416 } 417 // TODO: propagate exception message into debugData 418 frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY); 419 } 420 } 421 422 /** 423 * Closes this connection. This cancels all open streams and unanswered 424 * pings. It closes the underlying input and output streams and shuts down 425 * internal executor services. 426 */ 427 @Override public void close() throws IOException { 428 close(ErrorCode.NO_ERROR, ErrorCode.CANCEL); 429 } 430 431 private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException { 432 assert (!Thread.holdsLock(this)); 433 IOException thrown = null; 434 try { 435 shutdown(connectionCode); 436 } catch (IOException e) { 437 thrown = e; 438 } 439 440 SpdyStream[] streamsToClose = null; 441 Ping[] pingsToCancel = null; 442 synchronized (this) { 443 if (!streams.isEmpty()) { 444 streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); 445 streams.clear(); 446 setIdle(false); 447 } 448 if (pings != null) { 449 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 450 pings = null; 451 } 452 } 453 454 if (streamsToClose != null) { 455 for (SpdyStream stream : streamsToClose) { 456 try { 457 stream.close(streamCode); 458 } catch (IOException e) { 459 if (thrown != null) thrown = e; 460 } 461 } 462 } 463 464 if (pingsToCancel != null) { 465 for (Ping ping : pingsToCancel) { 466 ping.cancel(); 467 } 468 } 469 470 try { 471 frameReader.close(); 472 } catch (IOException e) { 473 thrown = e; 474 } 475 try { 476 frameWriter.close(); 477 } catch (IOException e) { 478 if (thrown == null) thrown = e; 479 } 480 481 if (thrown != null) throw thrown; 482 } 483 484 /** 485 * Sends a connection header if the current variant requires it. This should 486 * be called after {@link Builder#build} for all new connections. 487 */ 488 public void sendConnectionHeader() throws IOException { 489 frameWriter.connectionHeader(); 490 frameWriter.settings(okHttpSettings); 491 } 492 493 public static class Builder { 494 private String hostName; 495 private BufferedSource source; 496 private BufferedSink sink; 497 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 498 private Protocol protocol = Protocol.SPDY_3; 499 private PushObserver pushObserver = PushObserver.CANCEL; 500 private boolean client; 501 502 public Builder(boolean client, Socket socket) throws IOException { 503 this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket); 504 } 505 506 /** 507 * @param client true if this peer initiated the connection; false if this 508 * peer accepted the connection. 509 */ 510 public Builder(String hostName, boolean client, Socket socket) throws IOException { 511 this.hostName = hostName; 512 this.client = client; 513 this.source = Okio.buffer(Okio.source(socket.getInputStream())); 514 this.sink = Okio.buffer(Okio.sink(socket.getOutputStream())); 515 } 516 517 public Builder handler(IncomingStreamHandler handler) { 518 this.handler = handler; 519 return this; 520 } 521 522 public Builder protocol(Protocol protocol) { 523 this.protocol = protocol; 524 return this; 525 } 526 527 public Builder pushObserver(PushObserver pushObserver) { 528 this.pushObserver = pushObserver; 529 return this; 530 } 531 532 public SpdyConnection build() { 533 return new SpdyConnection(this); 534 } 535 } 536 537 /** 538 * Methods in this class must not lock FrameWriter. If a method needs to 539 * write a frame, create an async task to do so. 540 */ 541 class Reader extends NamedRunnable implements FrameReader.Handler { 542 private Reader() { 543 super("OkHttp %s", hostName); 544 } 545 546 @Override protected void execute() { 547 ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR; 548 ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR; 549 try { 550 if (!client) { 551 frameReader.readConnectionHeader(); 552 } 553 while (frameReader.nextFrame(this)) { 554 } 555 connectionErrorCode = ErrorCode.NO_ERROR; 556 streamErrorCode = ErrorCode.CANCEL; 557 } catch (IOException e) { 558 connectionErrorCode = ErrorCode.PROTOCOL_ERROR; 559 streamErrorCode = ErrorCode.PROTOCOL_ERROR; 560 } finally { 561 try { 562 close(connectionErrorCode, streamErrorCode); 563 } catch (IOException ignored) { 564 } 565 } 566 } 567 568 @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) 569 throws IOException { 570 if (pushedStream(streamId)) { 571 pushDataLater(streamId, source, length, inFinished); 572 return; 573 } 574 SpdyStream dataStream = getStream(streamId); 575 if (dataStream == null) { 576 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 577 source.skip(length); 578 return; 579 } 580 dataStream.receiveData(source, length); 581 if (inFinished) { 582 dataStream.receiveFin(); 583 } 584 } 585 586 @Override public void headers(boolean outFinished, boolean inFinished, int streamId, 587 int associatedStreamId, int priority, List<Header> headerBlock, HeadersMode headersMode) { 588 if (pushedStream(streamId)) { 589 pushHeadersLater(streamId, headerBlock, inFinished); 590 return; 591 } 592 SpdyStream stream; 593 synchronized (SpdyConnection.this) { 594 // If we're shutdown, don't bother with this stream. 595 if (shutdown) return; 596 597 stream = getStream(streamId); 598 599 if (stream == null) { 600 // The headers claim to be for an existing stream, but we don't have one. 601 if (headersMode.failIfStreamAbsent()) { 602 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 603 return; 604 } 605 606 // If the stream ID is less than the last created ID, assume it's already closed. 607 if (streamId <= lastGoodStreamId) return; 608 609 // If the stream ID is in the client's namespace, assume it's already closed. 610 if (streamId % 2 == nextStreamId % 2) return; 611 612 // Create a stream. 613 final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished, 614 inFinished, priority, headerBlock); 615 lastGoodStreamId = streamId; 616 streams.put(streamId, newStream); 617 executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 618 @Override public void execute() { 619 try { 620 handler.receive(newStream); 621 } catch (IOException e) { 622 throw new RuntimeException(e); 623 } 624 } 625 }); 626 return; 627 } 628 } 629 630 // The headers claim to be for a new stream, but we already have one. 631 if (headersMode.failIfStreamPresent()) { 632 stream.closeLater(ErrorCode.PROTOCOL_ERROR); 633 removeStream(streamId); 634 return; 635 } 636 637 // Update an existing stream. 638 stream.receiveHeaders(headerBlock, headersMode); 639 if (inFinished) stream.receiveFin(); 640 } 641 642 @Override public void rstStream(int streamId, ErrorCode errorCode) { 643 if (pushedStream(streamId)) { 644 pushResetLater(streamId, errorCode); 645 return; 646 } 647 SpdyStream rstStream = removeStream(streamId); 648 if (rstStream != null) { 649 rstStream.receiveRstStream(errorCode); 650 } 651 } 652 653 @Override public void settings(boolean clearPrevious, Settings newSettings) { 654 long delta = 0; 655 SpdyStream[] streamsToNotify = null; 656 synchronized (SpdyConnection.this) { 657 int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 658 if (clearPrevious) peerSettings.clear(); 659 peerSettings.merge(newSettings); 660 if (getProtocol() == Protocol.HTTP_2) { 661 ackSettingsLater(); 662 } 663 int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 664 if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { 665 delta = peerInitialWindowSize - priorWriteWindowSize; 666 if (!receivedInitialPeerSettings) { 667 addBytesToWriteWindow(delta); 668 receivedInitialPeerSettings = true; 669 } 670 if (!streams.isEmpty()) { 671 streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); 672 } 673 } 674 } 675 if (streamsToNotify != null && delta != 0) { 676 for (SpdyStream stream : streams.values()) { 677 synchronized (stream) { 678 stream.addBytesToWriteWindow(delta); 679 } 680 } 681 } 682 } 683 684 private void ackSettingsLater() { 685 executor.submit(new NamedRunnable("OkHttp %s ACK Settings", hostName) { 686 @Override public void execute() { 687 try { 688 frameWriter.ackSettings(); 689 } catch (IOException ignored) { 690 } 691 } 692 }); 693 } 694 695 @Override public void ackSettings() { 696 // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT. 697 } 698 699 @Override public void ping(boolean reply, int payload1, int payload2) { 700 if (reply) { 701 Ping ping = removePing(payload1); 702 if (ping != null) { 703 ping.receive(); 704 } 705 } else { 706 // Send a reply to a client ping if this is a server and vice versa. 707 writePingLater(true, payload1, payload2, null); 708 } 709 } 710 711 @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 712 if (debugData.size() > 0) { // TODO: log the debugData 713 } 714 synchronized (SpdyConnection.this) { 715 shutdown = true; 716 717 // Fail all streams created after the last good stream ID. 718 for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator(); 719 i.hasNext(); ) { 720 Map.Entry<Integer, SpdyStream> entry = i.next(); 721 int streamId = entry.getKey(); 722 if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { 723 entry.getValue().receiveRstStream(ErrorCode.REFUSED_STREAM); 724 i.remove(); 725 } 726 } 727 } 728 } 729 730 @Override public void windowUpdate(int streamId, long windowSizeIncrement) { 731 if (streamId == 0) { 732 synchronized (SpdyConnection.this) { 733 bytesLeftInWriteWindow += windowSizeIncrement; 734 SpdyConnection.this.notifyAll(); 735 } 736 } else { 737 SpdyStream stream = getStream(streamId); 738 if (stream != null) { 739 synchronized (stream) { 740 stream.addBytesToWriteWindow(windowSizeIncrement); 741 } 742 } 743 } 744 } 745 746 @Override public void priority(int streamId, int priority) { 747 // TODO: honor priority. 748 } 749 750 @Override 751 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) { 752 pushRequestLater(promisedStreamId, requestHeaders); 753 } 754 } 755 756 /** Even, positive numbered streams are pushed streams in HTTP/2. */ 757 private boolean pushedStream(int streamId) { 758 return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; 759 } 760 761 // Guarded by this. 762 private final Set<Integer> currentPushRequests = new LinkedHashSet<Integer>(); 763 764 private void pushRequestLater(final int streamId, final List<Header> requestHeaders) { 765 synchronized (this) { 766 if (currentPushRequests.contains(streamId)) { 767 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR); 768 return; 769 } 770 currentPushRequests.add(streamId); 771 } 772 executor.submit(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) { 773 @Override public void execute() { 774 boolean cancel = pushObserver.onRequest(streamId, requestHeaders); 775 try { 776 if (cancel) { 777 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 778 synchronized (SpdyConnection.this) { 779 currentPushRequests.remove(streamId); 780 } 781 } 782 } catch (IOException ignored) { 783 } 784 } 785 }); 786 } 787 788 private void pushHeadersLater(final int streamId, final List<Header> requestHeaders, 789 final boolean inFinished) { 790 executor.submit(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) { 791 @Override public void execute() { 792 boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished); 793 try { 794 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 795 if (cancel || inFinished) { 796 synchronized (SpdyConnection.this) { 797 currentPushRequests.remove(streamId); 798 } 799 } 800 } catch (IOException ignored) { 801 } 802 } 803 }); 804 } 805 806 /** 807 * Eagerly reads {@code byteCount} bytes from the source before launching a background task to 808 * process the data. This avoids corrupting the stream. 809 */ 810 private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount, 811 final boolean inFinished) throws IOException { 812 final OkBuffer buffer = new OkBuffer(); 813 source.require(byteCount); // Eagerly read the frame before firing client thread. 814 source.read(buffer, byteCount); 815 if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount); 816 executor.submit(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) { 817 @Override public void execute() { 818 try { 819 boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished); 820 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 821 if (cancel || inFinished) { 822 synchronized (SpdyConnection.this) { 823 currentPushRequests.remove(streamId); 824 } 825 } 826 } catch (IOException ignored) { 827 } 828 } 829 }); 830 } 831 832 private void pushResetLater(final int streamId, final ErrorCode errorCode) { 833 executor.submit(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) { 834 @Override public void execute() { 835 pushObserver.onReset(streamId, errorCode); 836 synchronized (SpdyConnection.this) { 837 currentPushRequests.remove(streamId); 838 } 839 } 840 }); 841 } 842 } 843