1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/websockets/websocket_job.h" 6 7 #include <algorithm> 8 9 #include "base/bind.h" 10 #include "base/lazy_instance.h" 11 #include "net/base/io_buffer.h" 12 #include "net/base/net_errors.h" 13 #include "net/base/net_log.h" 14 #include "net/cookies/cookie_store.h" 15 #include "net/http/http_network_session.h" 16 #include "net/http/http_transaction_factory.h" 17 #include "net/http/http_util.h" 18 #include "net/spdy/spdy_session.h" 19 #include "net/spdy/spdy_session_pool.h" 20 #include "net/url_request/url_request_context.h" 21 #include "net/websockets/websocket_handshake_handler.h" 22 #include "net/websockets/websocket_net_log_params.h" 23 #include "net/websockets/websocket_throttle.h" 24 #include "url/gurl.h" 25 26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. 27 28 namespace { 29 30 // lower-case header names. 31 const char* const kCookieHeaders[] = { 32 "cookie", "cookie2" 33 }; 34 const char* const kSetCookieHeaders[] = { 35 "set-cookie", "set-cookie2" 36 }; 37 38 net::SocketStreamJob* WebSocketJobFactory( 39 const GURL& url, net::SocketStream::Delegate* delegate, 40 net::URLRequestContext* context, net::CookieStore* cookie_store) { 41 net::WebSocketJob* job = new net::WebSocketJob(delegate); 42 job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store)); 43 return job; 44 } 45 46 class WebSocketJobInitSingleton { 47 private: 48 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>; 49 WebSocketJobInitSingleton() { 50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); 51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); 52 } 53 }; 54 55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init = 56 LAZY_INSTANCE_INITIALIZER; 57 58 } // anonymous namespace 59 60 namespace net { 61 62 // static 63 void WebSocketJob::EnsureInit() { 64 g_websocket_job_init.Get(); 65 } 66 67 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) 68 : delegate_(delegate), 69 state_(INITIALIZED), 70 waiting_(false), 71 handshake_request_(new WebSocketHandshakeRequestHandler), 72 handshake_response_(new WebSocketHandshakeResponseHandler), 73 started_to_send_handshake_request_(false), 74 handshake_request_sent_(0), 75 response_cookies_save_index_(0), 76 spdy_protocol_version_(0), 77 save_next_cookie_running_(false), 78 callback_pending_(false), 79 weak_ptr_factory_(this), 80 weak_ptr_factory_for_send_pending_(this) { 81 } 82 83 WebSocketJob::~WebSocketJob() { 84 DCHECK_EQ(CLOSED, state_); 85 DCHECK(!delegate_); 86 DCHECK(!socket_.get()); 87 } 88 89 void WebSocketJob::Connect() { 90 DCHECK(socket_.get()); 91 DCHECK_EQ(state_, INITIALIZED); 92 state_ = CONNECTING; 93 socket_->Connect(); 94 } 95 96 bool WebSocketJob::SendData(const char* data, int len) { 97 switch (state_) { 98 case INITIALIZED: 99 return false; 100 101 case CONNECTING: 102 return SendHandshakeRequest(data, len); 103 104 case OPEN: 105 { 106 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len); 107 memcpy(buffer->data(), data, len); 108 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) { 109 send_buffer_queue_.push_back(buffer); 110 return true; 111 } 112 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len); 113 return SendDataInternal(current_send_buffer_->data(), 114 current_send_buffer_->BytesRemaining()); 115 } 116 117 case CLOSING: 118 case CLOSED: 119 return false; 120 } 121 return false; 122 } 123 124 void WebSocketJob::Close() { 125 if (state_ == CLOSED) 126 return; 127 128 state_ = CLOSING; 129 if (current_send_buffer_.get()) { 130 // Will close in SendPending. 131 return; 132 } 133 state_ = CLOSED; 134 CloseInternal(); 135 } 136 137 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) { 138 state_ = CONNECTING; 139 socket_->RestartWithAuth(credentials); 140 } 141 142 void WebSocketJob::DetachDelegate() { 143 state_ = CLOSED; 144 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 145 146 scoped_refptr<WebSocketJob> protect(this); 147 weak_ptr_factory_.InvalidateWeakPtrs(); 148 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs(); 149 150 delegate_ = NULL; 151 if (socket_.get()) 152 socket_->DetachDelegate(); 153 socket_ = NULL; 154 if (!callback_.is_null()) { 155 waiting_ = false; 156 callback_.Reset(); 157 Release(); // Balanced with OnStartOpenConnection(). 158 } 159 } 160 161 int WebSocketJob::OnStartOpenConnection( 162 SocketStream* socket, const CompletionCallback& callback) { 163 DCHECK(callback_.is_null()); 164 state_ = CONNECTING; 165 166 addresses_ = socket->address_list(); 167 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) { 168 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE; 169 } 170 171 if (delegate_) { 172 int result = delegate_->OnStartOpenConnection(socket, callback); 173 DCHECK_EQ(OK, result); 174 } 175 if (waiting_) { 176 // PutInQueue() may set |waiting_| true for throttling. In this case, 177 // Wakeup() will be called later. 178 callback_ = callback; 179 AddRef(); // Balanced when callback_ is cleared. 180 return ERR_IO_PENDING; 181 } 182 return TrySpdyStream(); 183 } 184 185 void WebSocketJob::OnConnected( 186 SocketStream* socket, int max_pending_send_allowed) { 187 if (state_ == CLOSED) 188 return; 189 DCHECK_EQ(CONNECTING, state_); 190 if (delegate_) 191 delegate_->OnConnected(socket, max_pending_send_allowed); 192 } 193 194 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { 195 DCHECK_NE(INITIALIZED, state_); 196 DCHECK_GT(amount_sent, 0); 197 if (state_ == CLOSED) 198 return; 199 if (state_ == CONNECTING) { 200 OnSentHandshakeRequest(socket, amount_sent); 201 return; 202 } 203 if (delegate_) { 204 DCHECK(state_ == OPEN || state_ == CLOSING); 205 if (!current_send_buffer_.get()) { 206 VLOG(1) 207 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent; 208 return; 209 } 210 current_send_buffer_->DidConsume(amount_sent); 211 if (current_send_buffer_->BytesRemaining() > 0) 212 return; 213 214 // We need to report amount_sent of original buffer size, instead of 215 // amount sent to |socket|. 216 amount_sent = current_send_buffer_->size(); 217 DCHECK_GT(amount_sent, 0); 218 current_send_buffer_ = NULL; 219 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) { 220 base::MessageLoopForIO::current()->PostTask( 221 FROM_HERE, 222 base::Bind(&WebSocketJob::SendPending, 223 weak_ptr_factory_for_send_pending_.GetWeakPtr())); 224 } 225 delegate_->OnSentData(socket, amount_sent); 226 } 227 } 228 229 void WebSocketJob::OnReceivedData( 230 SocketStream* socket, const char* data, int len) { 231 DCHECK_NE(INITIALIZED, state_); 232 if (state_ == CLOSED) 233 return; 234 if (state_ == CONNECTING) { 235 OnReceivedHandshakeResponse(socket, data, len); 236 return; 237 } 238 DCHECK(state_ == OPEN || state_ == CLOSING); 239 if (delegate_ && len > 0) 240 delegate_->OnReceivedData(socket, data, len); 241 } 242 243 void WebSocketJob::OnClose(SocketStream* socket) { 244 state_ = CLOSED; 245 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 246 247 scoped_refptr<WebSocketJob> protect(this); 248 weak_ptr_factory_.InvalidateWeakPtrs(); 249 250 SocketStream::Delegate* delegate = delegate_; 251 delegate_ = NULL; 252 socket_ = NULL; 253 if (!callback_.is_null()) { 254 waiting_ = false; 255 callback_.Reset(); 256 Release(); // Balanced with OnStartOpenConnection(). 257 } 258 if (delegate) 259 delegate->OnClose(socket); 260 } 261 262 void WebSocketJob::OnAuthRequired( 263 SocketStream* socket, AuthChallengeInfo* auth_info) { 264 if (delegate_) 265 delegate_->OnAuthRequired(socket, auth_info); 266 } 267 268 void WebSocketJob::OnSSLCertificateError( 269 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) { 270 if (delegate_) 271 delegate_->OnSSLCertificateError(socket, ssl_info, fatal); 272 } 273 274 void WebSocketJob::OnError(const SocketStream* socket, int error) { 275 if (delegate_ && error != ERR_PROTOCOL_SWITCHED) 276 delegate_->OnError(socket, error); 277 } 278 279 void WebSocketJob::OnCreatedSpdyStream(int result) { 280 DCHECK(spdy_websocket_stream_.get()); 281 DCHECK(socket_.get()); 282 DCHECK_NE(ERR_IO_PENDING, result); 283 284 if (state_ == CLOSED) { 285 result = ERR_ABORTED; 286 } else if (result == OK) { 287 state_ = CONNECTING; 288 result = ERR_PROTOCOL_SWITCHED; 289 } else { 290 spdy_websocket_stream_.reset(); 291 } 292 293 CompleteIO(result); 294 } 295 296 void WebSocketJob::OnSentSpdyHeaders() { 297 DCHECK_NE(INITIALIZED, state_); 298 if (state_ != CONNECTING) 299 return; 300 size_t original_length = handshake_request_->original_length(); 301 handshake_request_.reset(); 302 if (delegate_) 303 delegate_->OnSentData(socket_.get(), original_length); 304 } 305 306 void WebSocketJob::OnSpdyResponseHeadersUpdated( 307 const SpdyHeaderBlock& response_headers) { 308 DCHECK_NE(INITIALIZED, state_); 309 if (state_ != CONNECTING) 310 return; 311 // TODO(toyoshim): Fallback to non-spdy connection? 312 handshake_response_->ParseResponseHeaderBlock(response_headers, 313 challenge_, 314 spdy_protocol_version_); 315 316 SaveCookiesAndNotifyHeadersComplete(); 317 } 318 319 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) { 320 DCHECK_NE(INITIALIZED, state_); 321 DCHECK_NE(CONNECTING, state_); 322 if (state_ == CLOSED) 323 return; 324 if (!spdy_websocket_stream_.get()) 325 return; 326 OnSentData(socket_.get(), static_cast<int>(bytes_sent)); 327 } 328 329 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) { 330 DCHECK_NE(INITIALIZED, state_); 331 DCHECK_NE(CONNECTING, state_); 332 if (state_ == CLOSED) 333 return; 334 if (!spdy_websocket_stream_.get()) 335 return; 336 if (buffer) { 337 OnReceivedData( 338 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize()); 339 } else { 340 OnReceivedData(socket_.get(), NULL, 0); 341 } 342 } 343 344 void WebSocketJob::OnCloseSpdyStream() { 345 spdy_websocket_stream_.reset(); 346 OnClose(socket_.get()); 347 } 348 349 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 350 DCHECK_EQ(state_, CONNECTING); 351 if (started_to_send_handshake_request_) 352 return false; 353 if (!handshake_request_->ParseRequest(data, len)) 354 return false; 355 356 AddCookieHeaderAndSend(); 357 return true; 358 } 359 360 void WebSocketJob::AddCookieHeaderAndSend() { 361 bool allow = true; 362 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies())) 363 allow = false; 364 365 if (socket_.get() && delegate_ && state_ == CONNECTING) { 366 handshake_request_->RemoveHeaders(kCookieHeaders, 367 arraysize(kCookieHeaders)); 368 if (allow && socket_->cookie_store()) { 369 // Add cookies, including HttpOnly cookies. 370 CookieOptions cookie_options; 371 cookie_options.set_include_httponly(); 372 socket_->cookie_store()->GetCookiesWithOptionsAsync( 373 GetURLForCookies(), cookie_options, 374 base::Bind(&WebSocketJob::LoadCookieCallback, 375 weak_ptr_factory_.GetWeakPtr())); 376 } else { 377 DoSendData(); 378 } 379 } 380 } 381 382 void WebSocketJob::LoadCookieCallback(const std::string& cookie) { 383 if (!cookie.empty()) 384 // TODO(tyoshino): Sending cookie means that connection doesn't need 385 // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id 386 // wouldn't negatively affect privacy anyway. Need to restart connection 387 // or refactor to determine cookie status prior to connecting. 388 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 389 DoSendData(); 390 } 391 392 void WebSocketJob::DoSendData() { 393 if (spdy_websocket_stream_.get()) { 394 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); 395 handshake_request_->GetRequestHeaderBlock( 396 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_); 397 spdy_websocket_stream_->SendRequest(headers.Pass()); 398 } else { 399 const std::string& handshake_request = 400 handshake_request_->GetRawRequest(); 401 handshake_request_sent_ = 0; 402 socket_->net_log()->AddEvent( 403 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 404 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request)); 405 socket_->SendData(handshake_request.data(), 406 handshake_request.size()); 407 } 408 // Just buffered in |handshake_request_|. 409 started_to_send_handshake_request_ = true; 410 } 411 412 void WebSocketJob::OnSentHandshakeRequest( 413 SocketStream* socket, int amount_sent) { 414 DCHECK_EQ(state_, CONNECTING); 415 handshake_request_sent_ += amount_sent; 416 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 417 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 418 // handshake request has been sent. 419 // notify original size of handshake request to delegate. 420 // Reset the handshake_request_ first in case this object is deleted by the 421 // delegate. 422 size_t original_length = handshake_request_->original_length(); 423 handshake_request_.reset(); 424 if (delegate_) 425 delegate_->OnSentData(socket, original_length); 426 } 427 } 428 429 void WebSocketJob::OnReceivedHandshakeResponse( 430 SocketStream* socket, const char* data, int len) { 431 DCHECK_EQ(state_, CONNECTING); 432 if (handshake_response_->HasResponse()) { 433 // If we already has handshake response, received data should be frame 434 // data, not handshake message. 435 received_data_after_handshake_.insert( 436 received_data_after_handshake_.end(), data, data + len); 437 return; 438 } 439 440 size_t response_length = handshake_response_->ParseRawResponse(data, len); 441 if (!handshake_response_->HasResponse()) { 442 // not yet. we need more data. 443 return; 444 } 445 // handshake message is completed. 446 std::string raw_response = handshake_response_->GetRawResponse(); 447 socket_->net_log()->AddEvent( 448 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS, 449 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response)); 450 if (len - response_length > 0) { 451 // If we received extra data, it should be frame data. 452 DCHECK(received_data_after_handshake_.empty()); 453 received_data_after_handshake_.assign(data + response_length, data + len); 454 } 455 SaveCookiesAndNotifyHeadersComplete(); 456 } 457 458 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() { 459 // handshake message is completed. 460 DCHECK(handshake_response_->HasResponse()); 461 462 // Extract cookies from the handshake response into a temporary vector. 463 response_cookies_.clear(); 464 response_cookies_save_index_ = 0; 465 466 handshake_response_->GetHeaders( 467 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); 468 469 // Now, loop over the response cookies, and attempt to persist each. 470 SaveNextCookie(); 471 } 472 473 void WebSocketJob::NotifyHeadersComplete() { 474 // Remove cookie headers, with malformed headers preserved. 475 // Actual handshake should be done in Blink. 476 handshake_response_->RemoveHeaders( 477 kSetCookieHeaders, arraysize(kSetCookieHeaders)); 478 std::string handshake_response = handshake_response_->GetResponse(); 479 handshake_response_.reset(); 480 std::vector<char> received_data(handshake_response.begin(), 481 handshake_response.end()); 482 received_data.insert(received_data.end(), 483 received_data_after_handshake_.begin(), 484 received_data_after_handshake_.end()); 485 received_data_after_handshake_.clear(); 486 487 state_ = OPEN; 488 489 DCHECK(!received_data.empty()); 490 if (delegate_) 491 delegate_->OnReceivedData( 492 socket_.get(), &received_data.front(), received_data.size()); 493 494 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 495 } 496 497 void WebSocketJob::SaveNextCookie() { 498 if (!socket_.get() || !delegate_ || state_ != CONNECTING) 499 return; 500 501 callback_pending_ = false; 502 save_next_cookie_running_ = true; 503 504 if (socket_->cookie_store()) { 505 GURL url_for_cookies = GetURLForCookies(); 506 507 CookieOptions options; 508 options.set_include_httponly(); 509 510 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since 511 // CookieMonster's asynchronous operation APIs queue the callback to run it 512 // on the thread where the API was called, there won't be race. I.e. unless 513 // the callback is run synchronously, it won't be run in parallel with this 514 // method. 515 while (!callback_pending_ && 516 response_cookies_save_index_ < response_cookies_.size()) { 517 std::string cookie = response_cookies_[response_cookies_save_index_]; 518 response_cookies_save_index_++; 519 520 if (!delegate_->CanSetCookie( 521 socket_.get(), url_for_cookies, cookie, &options)) 522 continue; 523 524 callback_pending_ = true; 525 socket_->cookie_store()->SetCookieWithOptionsAsync( 526 url_for_cookies, cookie, options, 527 base::Bind(&WebSocketJob::OnCookieSaved, 528 weak_ptr_factory_.GetWeakPtr())); 529 } 530 } 531 532 save_next_cookie_running_ = false; 533 534 if (callback_pending_) 535 return; 536 537 response_cookies_.clear(); 538 response_cookies_save_index_ = 0; 539 540 NotifyHeadersComplete(); 541 } 542 543 void WebSocketJob::OnCookieSaved(bool cookie_status) { 544 // Tell the caller of SetCookieWithOptionsAsync() that this completion 545 // callback is invoked. 546 // - If the caller checks callback_pending earlier than this callback, the 547 // caller exits to let this method continue iteration. 548 // - Otherwise, the caller continues iteration. 549 callback_pending_ = false; 550 551 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited 552 // the loop. Otherwise, return. 553 if (save_next_cookie_running_) 554 return; 555 556 SaveNextCookie(); 557 } 558 559 GURL WebSocketJob::GetURLForCookies() const { 560 GURL url = socket_->url(); 561 std::string scheme = socket_->is_secure() ? "https" : "http"; 562 url::Replacements<char> replacements; 563 replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length())); 564 return url.ReplaceComponents(replacements); 565 } 566 567 const AddressList& WebSocketJob::address_list() const { 568 return addresses_; 569 } 570 571 int WebSocketJob::TrySpdyStream() { 572 if (!socket_.get()) 573 return ERR_FAILED; 574 575 // Check if we have a SPDY session available. 576 HttpTransactionFactory* factory = 577 socket_->context()->http_transaction_factory(); 578 if (!factory) 579 return OK; 580 scoped_refptr<HttpNetworkSession> session = factory->GetSession(); 581 if (!session.get() || !session->params().enable_websocket_over_spdy) 582 return OK; 583 SpdySessionPool* spdy_pool = session->spdy_session_pool(); 584 PrivacyMode privacy_mode = socket_->privacy_mode(); 585 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()), 586 socket_->proxy_server(), privacy_mode); 587 // Forbid wss downgrade to SPDY without SSL. 588 // TODO(toyoshim): Does it realize the same policy with HTTP? 589 base::WeakPtr<SpdySession> spdy_session = 590 spdy_pool->FindAvailableSession(key, *socket_->net_log()); 591 if (!spdy_session) 592 return OK; 593 594 SSLInfo ssl_info; 595 bool was_npn_negotiated; 596 NextProto protocol_negotiated = kProtoUnknown; 597 bool use_ssl = spdy_session->GetSSLInfo( 598 &ssl_info, &was_npn_negotiated, &protocol_negotiated); 599 if (socket_->is_secure() && !use_ssl) 600 return OK; 601 602 // Create SpdyWebSocketStream. 603 spdy_protocol_version_ = spdy_session->GetProtocolVersion(); 604 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this)); 605 606 int result = spdy_websocket_stream_->InitializeStream( 607 socket_->url(), MEDIUM, *socket_->net_log()); 608 if (result == OK) { 609 OnConnected(socket_.get(), kMaxPendingSendAllowed); 610 return ERR_PROTOCOL_SWITCHED; 611 } 612 if (result != ERR_IO_PENDING) { 613 spdy_websocket_stream_.reset(); 614 return OK; 615 } 616 617 return ERR_IO_PENDING; 618 } 619 620 void WebSocketJob::SetWaiting() { 621 waiting_ = true; 622 } 623 624 bool WebSocketJob::IsWaiting() const { 625 return waiting_; 626 } 627 628 void WebSocketJob::Wakeup() { 629 if (!waiting_) 630 return; 631 waiting_ = false; 632 DCHECK(!callback_.is_null()); 633 base::MessageLoopForIO::current()->PostTask( 634 FROM_HERE, 635 base::Bind(&WebSocketJob::RetryPendingIO, 636 weak_ptr_factory_.GetWeakPtr())); 637 } 638 639 void WebSocketJob::RetryPendingIO() { 640 int result = TrySpdyStream(); 641 642 // In the case of ERR_IO_PENDING, CompleteIO() will be called from 643 // OnCreatedSpdyStream(). 644 if (result != ERR_IO_PENDING) 645 CompleteIO(result); 646 } 647 648 void WebSocketJob::CompleteIO(int result) { 649 // |callback_| may be null if OnClose() or DetachDelegate() was called. 650 if (!callback_.is_null()) { 651 CompletionCallback callback = callback_; 652 callback_.Reset(); 653 callback.Run(result); 654 Release(); // Balanced with OnStartOpenConnection(). 655 } 656 } 657 658 bool WebSocketJob::SendDataInternal(const char* data, int length) { 659 if (spdy_websocket_stream_.get()) 660 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length); 661 if (socket_.get()) 662 return socket_->SendData(data, length); 663 return false; 664 } 665 666 void WebSocketJob::CloseInternal() { 667 if (spdy_websocket_stream_.get()) 668 spdy_websocket_stream_->Close(); 669 if (socket_.get()) 670 socket_->Close(); 671 } 672 673 void WebSocketJob::SendPending() { 674 if (current_send_buffer_.get()) 675 return; 676 677 // Current buffer has been sent. Try next if any. 678 if (send_buffer_queue_.empty()) { 679 // No more data to send. 680 if (state_ == CLOSING) 681 CloseInternal(); 682 return; 683 } 684 685 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front(); 686 send_buffer_queue_.pop_front(); 687 current_send_buffer_ = 688 new DrainableIOBuffer(next_buffer.get(), next_buffer->size()); 689 SendDataInternal(current_send_buffer_->data(), 690 current_send_buffer_->BytesRemaining()); 691 } 692 693 } // namespace net 694