1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.squareup.okhttp.internal.spdy; 18 19 import java.io.EOFException; 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.net.SocketTimeoutException; 23 import java.util.ArrayList; 24 import java.util.List; 25 import okio.BufferedSource; 26 import okio.Deadline; 27 import okio.OkBuffer; 28 import okio.Sink; 29 import okio.Source; 30 31 import static com.squareup.okhttp.internal.spdy.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 32 33 /** A logical bidirectional stream. */ 34 public final class SpdyStream { 35 // Internal state is guarded by this. No long-running or potentially 36 // blocking operations are performed while the lock is held. 37 38 /** 39 * The total number of bytes consumed by the application (with {@link 40 * SpdyDataSource#read}), but not yet acknowledged by sending a {@code 41 * WINDOW_UPDATE} frame on this stream. 42 */ 43 // Visible for testing 44 long unacknowledgedBytesRead = 0; 45 46 /** 47 * Count of bytes that can be written on the stream before receiving a 48 * window update. Even if this is positive, writes will block until there 49 * available bytes in {@code connection.bytesLeftInWriteWindow}. 50 */ 51 // guarded by this 52 long bytesLeftInWriteWindow; 53 54 private final int id; 55 private final SpdyConnection connection; 56 private final int priority; 57 private long readTimeoutMillis = 0; 58 59 /** Headers sent by the stream initiator. Immutable and non null. */ 60 private final List<Header> requestHeaders; 61 62 /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */ 63 private List<Header> responseHeaders; 64 65 private final SpdyDataSource source; 66 final SpdyDataSink sink; 67 68 /** 69 * The reason why this stream was abnormally closed. If there are multiple 70 * reasons to abnormally close this stream (such as both peers closing it 71 * near-simultaneously) then this is the first reason known to this peer. 72 */ 73 private ErrorCode errorCode = null; 74 75 SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished, 76 int priority, List<Header> requestHeaders) { 77 if (connection == null) throw new NullPointerException("connection == null"); 78 if (requestHeaders == null) throw new NullPointerException("requestHeaders == null"); 79 this.id = id; 80 this.connection = connection; 81 this.bytesLeftInWriteWindow = 82 connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 83 this.source = new SpdyDataSource( 84 connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE)); 85 this.sink = new SpdyDataSink(); 86 this.source.finished = inFinished; 87 this.sink.finished = outFinished; 88 this.priority = priority; 89 this.requestHeaders = requestHeaders; 90 } 91 92 public int getId() { 93 return id; 94 } 95 96 /** 97 * Returns true if this stream is open. A stream is open until either: 98 * <ul> 99 * <li>A {@code SYN_RESET} frame abnormally terminates the stream. 100 * <li>Both input and output streams have transmitted all data and 101 * headers. 102 * </ul> 103 * Note that the input stream may continue to yield data even after a stream 104 * reports itself as not open. This is because input data is buffered. 105 */ 106 public synchronized boolean isOpen() { 107 if (errorCode != null) { 108 return false; 109 } 110 if ((source.finished || source.closed) 111 && (sink.finished || sink.closed) 112 && responseHeaders != null) { 113 return false; 114 } 115 return true; 116 } 117 118 /** Returns true if this stream was created by this peer. */ 119 public boolean isLocallyInitiated() { 120 boolean streamIsClient = ((id & 1) == 1); 121 return connection.client == streamIsClient; 122 } 123 124 public SpdyConnection getConnection() { 125 return connection; 126 } 127 128 public List<Header> getRequestHeaders() { 129 return requestHeaders; 130 } 131 132 /** 133 * Returns the stream's response headers, blocking if necessary if they 134 * have not been received yet. 135 */ 136 public synchronized List<Header> getResponseHeaders() throws IOException { 137 long remaining = 0; 138 long start = 0; 139 if (readTimeoutMillis != 0) { 140 start = (System.nanoTime() / 1000000); 141 remaining = readTimeoutMillis; 142 } 143 try { 144 while (responseHeaders == null && errorCode == null) { 145 if (readTimeoutMillis == 0) { // No timeout configured. 146 wait(); 147 } else if (remaining > 0) { 148 wait(remaining); 149 remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000); 150 } else { 151 throw new SocketTimeoutException("Read response header timeout. readTimeoutMillis: " 152 + readTimeoutMillis); 153 } 154 } 155 if (responseHeaders != null) { 156 return responseHeaders; 157 } 158 throw new IOException("stream was reset: " + errorCode); 159 } catch (InterruptedException e) { 160 InterruptedIOException rethrow = new InterruptedIOException(); 161 rethrow.initCause(e); 162 throw rethrow; 163 } 164 } 165 166 /** 167 * Returns the reason why this stream was closed, or null if it closed 168 * normally or has not yet been closed. 169 */ 170 public synchronized ErrorCode getErrorCode() { 171 return errorCode; 172 } 173 174 /** 175 * Sends a reply to an incoming stream. 176 * 177 * @param out true to create an output stream that we can use to send data 178 * to the remote peer. Corresponds to {@code FLAG_FIN}. 179 */ 180 public void reply(List<Header> responseHeaders, boolean out) throws IOException { 181 assert (!Thread.holdsLock(SpdyStream.this)); 182 boolean outFinished = false; 183 synchronized (this) { 184 if (responseHeaders == null) { 185 throw new NullPointerException("responseHeaders == null"); 186 } 187 if (this.responseHeaders != null) { 188 throw new IllegalStateException("reply already sent"); 189 } 190 this.responseHeaders = responseHeaders; 191 if (!out) { 192 this.sink.finished = true; 193 outFinished = true; 194 } 195 } 196 connection.writeSynReply(id, outFinished, responseHeaders); 197 198 if (outFinished) { 199 connection.flush(); 200 } 201 } 202 203 /** 204 * Sets the maximum time to wait on input stream reads before failing with a 205 * {@code SocketTimeoutException}, or {@code 0} to wait indefinitely. 206 */ 207 public void setReadTimeout(long readTimeoutMillis) { 208 this.readTimeoutMillis = readTimeoutMillis; 209 } 210 211 public long getReadTimeoutMillis() { 212 return readTimeoutMillis; 213 } 214 215 /** Returns a source that reads data from the peer. */ 216 public Source getSource() { 217 return source; 218 } 219 220 /** 221 * Returns a sink that can be used to write data to the peer. 222 * 223 * @throws IllegalStateException if this stream was initiated by the peer 224 * and a {@link #reply} has not yet been sent. 225 */ 226 public Sink getSink() { 227 synchronized (this) { 228 if (responseHeaders == null && !isLocallyInitiated()) { 229 throw new IllegalStateException("reply before requesting the sink"); 230 } 231 } 232 return sink; 233 } 234 235 /** 236 * Abnormally terminate this stream. This blocks until the {@code RST_STREAM} 237 * frame has been transmitted. 238 */ 239 public void close(ErrorCode rstStatusCode) throws IOException { 240 if (!closeInternal(rstStatusCode)) { 241 return; // Already closed. 242 } 243 connection.writeSynReset(id, rstStatusCode); 244 } 245 246 /** 247 * Abnormally terminate this stream. This enqueues a {@code RST_STREAM} 248 * frame and returns immediately. 249 */ 250 public void closeLater(ErrorCode errorCode) { 251 if (!closeInternal(errorCode)) { 252 return; // Already closed. 253 } 254 connection.writeSynResetLater(id, errorCode); 255 } 256 257 /** Returns true if this stream was closed. */ 258 private boolean closeInternal(ErrorCode errorCode) { 259 assert (!Thread.holdsLock(this)); 260 synchronized (this) { 261 if (this.errorCode != null) { 262 return false; 263 } 264 if (source.finished && sink.finished) { 265 return false; 266 } 267 this.errorCode = errorCode; 268 notifyAll(); 269 } 270 connection.removeStream(id); 271 return true; 272 } 273 274 void receiveHeaders(List<Header> headers, HeadersMode headersMode) { 275 assert (!Thread.holdsLock(SpdyStream.this)); 276 ErrorCode errorCode = null; 277 boolean open = true; 278 synchronized (this) { 279 if (responseHeaders == null) { 280 if (headersMode.failIfHeadersAbsent()) { 281 errorCode = ErrorCode.PROTOCOL_ERROR; 282 } else { 283 responseHeaders = headers; 284 open = isOpen(); 285 notifyAll(); 286 } 287 } else { 288 if (headersMode.failIfHeadersPresent()) { 289 errorCode = ErrorCode.STREAM_IN_USE; 290 } else { 291 List<Header> newHeaders = new ArrayList<Header>(); 292 newHeaders.addAll(responseHeaders); 293 newHeaders.addAll(headers); 294 this.responseHeaders = newHeaders; 295 } 296 } 297 } 298 if (errorCode != null) { 299 closeLater(errorCode); 300 } else if (!open) { 301 connection.removeStream(id); 302 } 303 } 304 305 void receiveData(BufferedSource in, int length) throws IOException { 306 assert (!Thread.holdsLock(SpdyStream.this)); 307 this.source.receive(in, length); 308 } 309 310 void receiveFin() { 311 assert (!Thread.holdsLock(SpdyStream.this)); 312 boolean open; 313 synchronized (this) { 314 this.source.finished = true; 315 open = isOpen(); 316 notifyAll(); 317 } 318 if (!open) { 319 connection.removeStream(id); 320 } 321 } 322 323 synchronized void receiveRstStream(ErrorCode errorCode) { 324 if (this.errorCode == null) { 325 this.errorCode = errorCode; 326 notifyAll(); 327 } 328 } 329 330 int getPriority() { 331 return priority; 332 } 333 334 /** 335 * A source that reads the incoming data frames of a stream. Although this 336 * class uses synchronization to safely receive incoming data frames, it is 337 * not intended for use by multiple readers. 338 */ 339 private final class SpdyDataSource implements Source { 340 /** Buffer to receive data from the network into. Only accessed by the reader thread. */ 341 private final OkBuffer receiveBuffer = new OkBuffer(); 342 343 /** Buffer with readable data. Guarded by SpdyStream.this. */ 344 private final OkBuffer readBuffer = new OkBuffer(); 345 346 /** Maximum number of bytes to buffer before reporting a flow control error. */ 347 private final long maxByteCount; 348 349 /** True if the caller has closed this stream. */ 350 private boolean closed; 351 352 /** 353 * True if either side has cleanly shut down this stream. We will 354 * receive no more bytes beyond those already in the buffer. 355 */ 356 private boolean finished; 357 358 private SpdyDataSource(long maxByteCount) { 359 this.maxByteCount = maxByteCount; 360 } 361 362 @Override public long read(OkBuffer sink, long byteCount) 363 throws IOException { 364 if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); 365 366 long read; 367 synchronized (SpdyStream.this) { 368 waitUntilReadable(); 369 checkNotClosed(); 370 if (readBuffer.size() == 0) return -1; // This source is exhausted. 371 372 // Move bytes from the read buffer into the caller's buffer. 373 read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size())); 374 375 // Flow control: notify the peer that we're ready for more data! 376 unacknowledgedBytesRead += read; 377 if (unacknowledgedBytesRead 378 >= connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { 379 connection.writeWindowUpdateLater(id, unacknowledgedBytesRead); 380 unacknowledgedBytesRead = 0; 381 } 382 } 383 384 // Update connection.unacknowledgedBytesRead outside the stream lock. 385 synchronized (connection) { // Multiple application threads may hit this section. 386 connection.unacknowledgedBytesRead += read; 387 if (connection.unacknowledgedBytesRead 388 >= connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { 389 connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead); 390 connection.unacknowledgedBytesRead = 0; 391 } 392 } 393 394 return read; 395 } 396 397 /** 398 * Returns once the input stream is either readable or finished. Throws 399 * a {@link SocketTimeoutException} if the read timeout elapses before 400 * that happens. 401 */ 402 private void waitUntilReadable() throws IOException { 403 long start = 0; 404 long remaining = 0; 405 if (readTimeoutMillis != 0) { 406 start = (System.nanoTime() / 1000000); 407 remaining = readTimeoutMillis; 408 } 409 try { 410 while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) { 411 if (readTimeoutMillis == 0) { 412 SpdyStream.this.wait(); 413 } else if (remaining > 0) { 414 SpdyStream.this.wait(remaining); 415 remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000); 416 } else { 417 throw new SocketTimeoutException("Read timed out"); 418 } 419 } 420 } catch (InterruptedException e) { 421 throw new InterruptedIOException(); 422 } 423 } 424 425 void receive(BufferedSource in, long byteCount) throws IOException { 426 assert (!Thread.holdsLock(SpdyStream.this)); 427 428 while (byteCount > 0) { 429 boolean finished; 430 boolean flowControlError; 431 synchronized (SpdyStream.this) { 432 finished = this.finished; 433 flowControlError = byteCount + readBuffer.size() > maxByteCount; 434 } 435 436 // If the peer sends more data than we can handle, discard it and close the connection. 437 if (flowControlError) { 438 in.skip(byteCount); 439 closeLater(ErrorCode.FLOW_CONTROL_ERROR); 440 return; 441 } 442 443 // Discard data received after the stream is finished. It's probably a benign race. 444 if (finished) { 445 in.skip(byteCount); 446 return; 447 } 448 449 // Fill the receive buffer without holding any locks. 450 long read = in.read(receiveBuffer, byteCount); 451 if (read == -1) throw new EOFException(); 452 byteCount -= read; 453 454 // Move the received data to the read buffer to the reader can read it. 455 synchronized (SpdyStream.this) { 456 boolean wasEmpty = readBuffer.size() == 0; 457 readBuffer.write(receiveBuffer, receiveBuffer.size()); 458 if (wasEmpty) { 459 SpdyStream.this.notifyAll(); 460 } 461 } 462 } 463 } 464 465 @Override public Source deadline(Deadline deadline) { 466 // TODO: honor deadlines. 467 return this; 468 } 469 470 @Override public void close() throws IOException { 471 synchronized (SpdyStream.this) { 472 closed = true; 473 readBuffer.clear(); 474 SpdyStream.this.notifyAll(); 475 } 476 cancelStreamIfNecessary(); 477 } 478 479 private void checkNotClosed() throws IOException { 480 if (closed) { 481 throw new IOException("stream closed"); 482 } 483 if (errorCode != null) { 484 throw new IOException("stream was reset: " + errorCode); 485 } 486 } 487 } 488 489 private void cancelStreamIfNecessary() throws IOException { 490 assert (!Thread.holdsLock(SpdyStream.this)); 491 boolean open; 492 boolean cancel; 493 synchronized (this) { 494 cancel = !source.finished && source.closed && (sink.finished || sink.closed); 495 open = isOpen(); 496 } 497 if (cancel) { 498 // RST this stream to prevent additional data from being sent. This 499 // is safe because the input stream is closed (we won't use any 500 // further bytes) and the output stream is either finished or closed 501 // (so RSTing both streams doesn't cause harm). 502 SpdyStream.this.close(ErrorCode.CANCEL); 503 } else if (!open) { 504 connection.removeStream(id); 505 } 506 } 507 508 /** 509 * A sink that writes outgoing data frames of a stream. This class is not 510 * thread safe. 511 */ 512 final class SpdyDataSink implements Sink { 513 private boolean closed; 514 515 /** 516 * True if either side has cleanly shut down this stream. We shall send 517 * no more bytes. 518 */ 519 private boolean finished; 520 521 @Override public void write(OkBuffer source, long byteCount) throws IOException { 522 assert (!Thread.holdsLock(SpdyStream.this)); 523 while (byteCount > 0) { 524 long toWrite; 525 synchronized (SpdyStream.this) { 526 try { 527 while (bytesLeftInWriteWindow <= 0) { 528 SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE. 529 } 530 } catch (InterruptedException e) { 531 throw new InterruptedIOException(); 532 } 533 534 checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. 535 toWrite = Math.min(bytesLeftInWriteWindow, byteCount); 536 bytesLeftInWriteWindow -= toWrite; 537 } 538 539 byteCount -= toWrite; 540 connection.writeData(id, false, source, toWrite); 541 } 542 } 543 544 @Override public void flush() throws IOException { 545 assert (!Thread.holdsLock(SpdyStream.this)); 546 synchronized (SpdyStream.this) { 547 checkOutNotClosed(); 548 } 549 connection.flush(); 550 } 551 552 @Override public Sink deadline(Deadline deadline) { 553 // TODO: honor deadlines. 554 return this; 555 } 556 557 @Override public void close() throws IOException { 558 assert (!Thread.holdsLock(SpdyStream.this)); 559 synchronized (SpdyStream.this) { 560 if (closed) return; 561 } 562 if (!sink.finished) { 563 connection.writeData(id, true, null, 0); 564 } 565 synchronized (SpdyStream.this) { 566 closed = true; 567 } 568 connection.flush(); 569 cancelStreamIfNecessary(); 570 } 571 } 572 573 /** 574 * {@code delta} will be negative if a settings frame initial window is 575 * smaller than the last. 576 */ 577 void addBytesToWriteWindow(long delta) { 578 bytesLeftInWriteWindow += delta; 579 if (delta > 0) SpdyStream.this.notifyAll(); 580 } 581 582 private void checkOutNotClosed() throws IOException { 583 if (sink.closed) { 584 throw new IOException("stream closed"); 585 } else if (sink.finished) { 586 throw new IOException("stream finished"); 587 } else if (errorCode != null) { 588 throw new IOException("stream was reset: " + errorCode); 589 } 590 } 591 } 592