1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/flip/flip_stream.h" 6 7 #include "base/logging.h" 8 #include "net/flip/flip_session.h" 9 #include "net/http/http_request_info.h" 10 #include "net/http/http_response_info.h" 11 12 namespace net { 13 14 FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id, 15 bool pushed, LoadLog* log) 16 : stream_id_(stream_id), 17 priority_(0), 18 pushed_(pushed), 19 download_finished_(false), 20 metrics_(Singleton<BandwidthMetrics>::get()), 21 session_(session), 22 response_(NULL), 23 request_body_stream_(NULL), 24 response_complete_(false), 25 io_state_(STATE_NONE), 26 response_status_(OK), 27 user_callback_(NULL), 28 user_buffer_(NULL), 29 user_buffer_len_(0), 30 cancelled_(false), 31 load_log_(log), 32 send_bytes_(0), 33 recv_bytes_(0), 34 histograms_recorded_(false) {} 35 36 FlipStream::~FlipStream() { 37 DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_; 38 39 // TODO(willchan): We're still calling CancelStream() too many times, because 40 // inactive pending/pushed streams will still have stream_id_ set. 41 if (stream_id_) { 42 session_->CancelStream(stream_id_); 43 } else if (!response_complete_) { 44 NOTREACHED(); 45 } 46 } 47 48 uint64 FlipStream::GetUploadProgress() const { 49 if (!request_body_stream_.get()) 50 return 0; 51 52 return request_body_stream_->position(); 53 } 54 55 const HttpResponseInfo* FlipStream::GetResponseInfo() const { 56 return response_; 57 } 58 59 int FlipStream::ReadResponseHeaders(CompletionCallback* callback) { 60 // Note: The FlipStream may have already received the response headers, so 61 // this call may complete synchronously. 62 CHECK(callback); 63 CHECK(io_state_ == STATE_NONE); 64 CHECK(!cancelled_); 65 66 // The SYN_REPLY has already been received. 67 if (response_->headers) 68 return OK; 69 70 io_state_ = STATE_READ_HEADERS; 71 CHECK(!user_callback_); 72 user_callback_ = callback; 73 return ERR_IO_PENDING; 74 } 75 76 int FlipStream::ReadResponseBody( 77 IOBuffer* buf, int buf_len, CompletionCallback* callback) { 78 DCHECK_EQ(io_state_, STATE_NONE); 79 CHECK(buf); 80 CHECK(buf_len); 81 CHECK(callback); 82 CHECK(!cancelled_); 83 84 // If we have data buffered, complete the IO immediately. 85 if (response_body_.size()) { 86 int bytes_read = 0; 87 while (response_body_.size() && buf_len > 0) { 88 scoped_refptr<IOBufferWithSize> data = response_body_.front(); 89 const int bytes_to_copy = std::min(buf_len, data->size()); 90 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); 91 buf_len -= bytes_to_copy; 92 if (bytes_to_copy == data->size()) { 93 response_body_.pop_front(); 94 } else { 95 const int bytes_remaining = data->size() - bytes_to_copy; 96 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); 97 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), 98 bytes_remaining); 99 response_body_.pop_front(); 100 response_body_.push_front(new_buffer); 101 } 102 bytes_read += bytes_to_copy; 103 } 104 if (bytes_read > 0) 105 recv_bytes_ += bytes_read; 106 return bytes_read; 107 } else if (response_complete_) { 108 return response_status_; 109 } 110 111 CHECK(!user_callback_); 112 CHECK(!user_buffer_); 113 CHECK(user_buffer_len_ == 0); 114 115 user_callback_ = callback; 116 user_buffer_ = buf; 117 user_buffer_len_ = buf_len; 118 return ERR_IO_PENDING; 119 } 120 121 int FlipStream::SendRequest(UploadDataStream* upload_data, 122 HttpResponseInfo* response, 123 CompletionCallback* callback) { 124 CHECK(callback); 125 CHECK(!cancelled_); 126 CHECK(response); 127 128 response_ = response; 129 130 if (upload_data) { 131 if (upload_data->size()) 132 request_body_stream_.reset(upload_data); 133 else 134 delete upload_data; 135 } 136 137 send_time_ = base::TimeTicks::Now(); 138 139 DCHECK_EQ(io_state_, STATE_NONE); 140 if (!pushed_) 141 io_state_ = STATE_SEND_HEADERS; 142 else 143 io_state_ = STATE_READ_HEADERS; 144 int result = DoLoop(OK); 145 if (result == ERR_IO_PENDING) { 146 CHECK(!user_callback_); 147 user_callback_ = callback; 148 } 149 return result; 150 } 151 152 void FlipStream::Cancel() { 153 cancelled_ = true; 154 user_callback_ = NULL; 155 156 session_->CancelStream(stream_id_); 157 } 158 159 void FlipStream::OnResponseReceived(const HttpResponseInfo& response) { 160 metrics_.StartStream(); 161 162 CHECK(!response_->headers); 163 164 *response_ = response; // TODO(mbelshe): avoid copy. 165 DCHECK(response_->headers); 166 167 recv_first_byte_time_ = base::TimeTicks::Now(); 168 169 if (io_state_ == STATE_NONE) { 170 CHECK(pushed_); 171 } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) { 172 CHECK(!pushed_); 173 } else { 174 NOTREACHED(); 175 } 176 177 int rv = DoLoop(OK); 178 179 if (user_callback_) 180 DoCallback(rv); 181 } 182 183 bool FlipStream::OnDataReceived(const char* data, int length) { 184 DCHECK_GE(length, 0); 185 LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " 186 << stream_id_; 187 188 // If we don't have a response, then the SYN_REPLY did not come through. 189 // We cannot pass data up to the caller unless the reply headers have been 190 // received. 191 if (!response_->headers) { 192 OnClose(ERR_SYN_REPLY_NOT_RECEIVED); 193 return false; 194 } 195 196 if (length > 0) 197 recv_bytes_ += length; 198 recv_last_byte_time_ = base::TimeTicks::Now(); 199 200 // A zero-length read means that the stream is being closed. 201 if (!length) { 202 metrics_.StopStream(); 203 download_finished_ = true; 204 OnClose(net::OK); 205 return true; 206 } 207 208 // Track our bandwidth. 209 metrics_.RecordBytes(length); 210 211 if (length > 0) { 212 // TODO(mbelshe): If read is pending, we should copy the data straight into 213 // the read buffer here. For now, we'll queue it always. 214 // TODO(mbelshe): We need to have some throttling on this. We shouldn't 215 // buffer an infinite amount of data. 216 217 IOBufferWithSize* io_buffer = new IOBufferWithSize(length); 218 memcpy(io_buffer->data(), data, length); 219 220 response_body_.push_back(io_buffer); 221 } 222 223 // Note that data may be received for a FlipStream prior to the user calling 224 // ReadResponseBody(), therefore user_callback_ may be NULL. This may often 225 // happen for server initiated streams. 226 if (user_callback_) { 227 int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); 228 CHECK(rv != ERR_IO_PENDING); 229 user_buffer_ = NULL; 230 user_buffer_len_ = 0; 231 DoCallback(rv); 232 } 233 234 return true; 235 } 236 237 void FlipStream::OnWriteComplete(int status) { 238 // TODO(mbelshe): Check for cancellation here. If we're cancelled, we 239 // should discontinue the DoLoop. 240 241 if (status > 0) 242 send_bytes_ += status; 243 244 DoLoop(status); 245 } 246 247 void FlipStream::OnClose(int status) { 248 response_complete_ = true; 249 response_status_ = status; 250 stream_id_ = 0; 251 252 if (user_callback_) 253 DoCallback(status); 254 255 UpdateHistograms(); 256 } 257 258 int FlipStream::DoLoop(int result) { 259 do { 260 State state = io_state_; 261 io_state_ = STATE_NONE; 262 switch (state) { 263 // State machine 1: Send headers and wait for response headers. 264 case STATE_SEND_HEADERS: 265 CHECK(result == OK); 266 LoadLog::BeginEvent(load_log_, 267 LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS); 268 result = DoSendHeaders(); 269 break; 270 case STATE_SEND_HEADERS_COMPLETE: 271 LoadLog::EndEvent(load_log_, 272 LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS); 273 result = DoSendHeadersComplete(result); 274 break; 275 case STATE_SEND_BODY: 276 CHECK(result == OK); 277 LoadLog::BeginEvent(load_log_, 278 LoadLog::TYPE_FLIP_STREAM_SEND_BODY); 279 result = DoSendBody(); 280 break; 281 case STATE_SEND_BODY_COMPLETE: 282 LoadLog::EndEvent(load_log_, 283 LoadLog::TYPE_FLIP_STREAM_SEND_BODY); 284 result = DoSendBodyComplete(result); 285 break; 286 case STATE_READ_HEADERS: 287 CHECK(result == OK); 288 LoadLog::BeginEvent(load_log_, 289 LoadLog::TYPE_FLIP_STREAM_READ_HEADERS); 290 result = DoReadHeaders(); 291 break; 292 case STATE_READ_HEADERS_COMPLETE: 293 LoadLog::EndEvent(load_log_, 294 LoadLog::TYPE_FLIP_STREAM_READ_HEADERS); 295 result = DoReadHeadersComplete(result); 296 break; 297 298 // State machine 2: Read body. 299 // NOTE(willchan): Currently unused. Currently we handle this stuff in 300 // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc. Only reason 301 // to do this is for consistency with the Http code. 302 case STATE_READ_BODY: 303 LoadLog::BeginEvent(load_log_, 304 LoadLog::TYPE_FLIP_STREAM_READ_BODY); 305 result = DoReadBody(); 306 break; 307 case STATE_READ_BODY_COMPLETE: 308 LoadLog::EndEvent(load_log_, 309 LoadLog::TYPE_FLIP_STREAM_READ_BODY); 310 result = DoReadBodyComplete(result); 311 break; 312 case STATE_DONE: 313 DCHECK(result != ERR_IO_PENDING); 314 break; 315 default: 316 NOTREACHED(); 317 break; 318 } 319 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE); 320 321 return result; 322 } 323 324 void FlipStream::DoCallback(int rv) { 325 CHECK(rv != ERR_IO_PENDING); 326 CHECK(user_callback_); 327 328 // Since Run may result in being called back, clear user_callback_ in advance. 329 CompletionCallback* c = user_callback_; 330 user_callback_ = NULL; 331 c->Run(rv); 332 } 333 334 int FlipStream::DoSendHeaders() { 335 // The FlipSession will always call us back when the send is complete. 336 // TODO(willchan): This code makes the assumption that for the non-push stream 337 // case, the client code calls SendRequest() after creating the stream and 338 // before yielding back to the MessageLoop. This is true in the current code, 339 // but is not obvious from the headers. We should make the code handle 340 // SendRequest() being called after the SYN_REPLY has been received. 341 io_state_ = STATE_SEND_HEADERS_COMPLETE; 342 return ERR_IO_PENDING; 343 } 344 345 int FlipStream::DoSendHeadersComplete(int result) { 346 if (result < 0) 347 return result; 348 349 CHECK(result > 0); 350 351 // There is no body, skip that state. 352 if (!request_body_stream_.get()) { 353 io_state_ = STATE_READ_HEADERS; 354 return OK; 355 } 356 357 io_state_ = STATE_SEND_BODY; 358 return OK; 359 } 360 361 // DoSendBody is called to send the optional body for the request. This call 362 // will also be called as each write of a chunk of the body completes. 363 int FlipStream::DoSendBody() { 364 // If we're already in the STATE_SENDING_BODY state, then we've already 365 // sent a portion of the body. In that case, we need to first consume 366 // the bytes written in the body stream. Note that the bytes written is 367 // the number of bytes in the frame that were written, only consume the 368 // data portion, of course. 369 io_state_ = STATE_SEND_BODY_COMPLETE; 370 int buf_len = static_cast<int>(request_body_stream_->buf_len()); 371 return session_->WriteStreamData(stream_id_, 372 request_body_stream_->buf(), 373 buf_len); 374 } 375 376 int FlipStream::DoSendBodyComplete(int result) { 377 if (result < 0) 378 return result; 379 380 CHECK(result != 0); 381 382 request_body_stream_->DidConsume(result); 383 384 if (request_body_stream_->position() < request_body_stream_->size()) 385 io_state_ = STATE_SEND_BODY; 386 else 387 io_state_ = STATE_READ_HEADERS; 388 389 return OK; 390 } 391 392 int FlipStream::DoReadHeaders() { 393 io_state_ = STATE_READ_HEADERS_COMPLETE; 394 return response_->headers ? OK : ERR_IO_PENDING; 395 } 396 397 int FlipStream::DoReadHeadersComplete(int result) { 398 return result; 399 } 400 401 int FlipStream::DoReadBody() { 402 // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this 403 // makes sense. 404 return ERR_IO_PENDING; 405 } 406 407 int FlipStream::DoReadBodyComplete(int result) { 408 // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this 409 // makes sense. 410 return ERR_IO_PENDING; 411 } 412 413 void FlipStream::UpdateHistograms() { 414 if (histograms_recorded_) 415 return; 416 417 histograms_recorded_ = true; 418 419 // We need all timers to be filled in, otherwise metrics can be bogus. 420 if (send_time_.is_null() || recv_first_byte_time_.is_null() || 421 recv_last_byte_time_.is_null()) 422 return; 423 424 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", 425 recv_first_byte_time_ - send_time_); 426 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 427 recv_last_byte_time_ - recv_first_byte_time_); 428 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 429 recv_last_byte_time_ - send_time_); 430 431 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 432 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 433 } 434 435 } // namespace net 436