1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/websockets/websocket_job.h" 6 7 #include <algorithm> 8 9 #include "base/bind.h" 10 #include "base/lazy_instance.h" 11 #include "net/base/io_buffer.h" 12 #include "net/base/net_errors.h" 13 #include "net/base/net_log.h" 14 #include "net/cookies/cookie_store.h" 15 #include "net/http/http_network_session.h" 16 #include "net/http/http_transaction_factory.h" 17 #include "net/http/http_util.h" 18 #include "net/spdy/spdy_session.h" 19 #include "net/spdy/spdy_session_pool.h" 20 #include "net/url_request/url_request_context.h" 21 #include "net/websockets/websocket_handshake_handler.h" 22 #include "net/websockets/websocket_net_log_params.h" 23 #include "net/websockets/websocket_throttle.h" 24 #include "url/gurl.h" 25 26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. 27 28 namespace { 29 30 // lower-case header names. 31 const char* const kCookieHeaders[] = { 32 "cookie", "cookie2" 33 }; 34 const char* const kSetCookieHeaders[] = { 35 "set-cookie", "set-cookie2" 36 }; 37 38 net::SocketStreamJob* WebSocketJobFactory( 39 const GURL& url, net::SocketStream::Delegate* delegate) { 40 net::WebSocketJob* job = new net::WebSocketJob(delegate); 41 job->InitSocketStream(new net::SocketStream(url, job)); 42 return job; 43 } 44 45 class WebSocketJobInitSingleton { 46 private: 47 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>; 48 WebSocketJobInitSingleton() { 49 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); 50 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); 51 } 52 }; 53 54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init = 55 LAZY_INSTANCE_INITIALIZER; 56 57 } // anonymous namespace 58 59 namespace net { 60 61 bool WebSocketJob::websocket_over_spdy_enabled_ = false; 62 63 // static 64 void WebSocketJob::EnsureInit() { 65 g_websocket_job_init.Get(); 66 } 67 68 // static 69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) { 70 websocket_over_spdy_enabled_ = enabled; 71 } 72 73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) 74 : delegate_(delegate), 75 state_(INITIALIZED), 76 waiting_(false), 77 handshake_request_(new WebSocketHandshakeRequestHandler), 78 handshake_response_(new WebSocketHandshakeResponseHandler), 79 started_to_send_handshake_request_(false), 80 handshake_request_sent_(0), 81 response_cookies_save_index_(0), 82 spdy_protocol_version_(0), 83 save_next_cookie_running_(false), 84 callback_pending_(false), 85 weak_ptr_factory_(this), 86 weak_ptr_factory_for_send_pending_(this) { 87 } 88 89 WebSocketJob::~WebSocketJob() { 90 DCHECK_EQ(CLOSED, state_); 91 DCHECK(!delegate_); 92 DCHECK(!socket_.get()); 93 } 94 95 void WebSocketJob::Connect() { 96 DCHECK(socket_.get()); 97 DCHECK_EQ(state_, INITIALIZED); 98 state_ = CONNECTING; 99 socket_->Connect(); 100 } 101 102 bool WebSocketJob::SendData(const char* data, int len) { 103 switch (state_) { 104 case INITIALIZED: 105 return false; 106 107 case CONNECTING: 108 return SendHandshakeRequest(data, len); 109 110 case OPEN: 111 { 112 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len); 113 memcpy(buffer->data(), data, len); 114 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) { 115 send_buffer_queue_.push_back(buffer); 116 return true; 117 } 118 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len); 119 return SendDataInternal(current_send_buffer_->data(), 120 current_send_buffer_->BytesRemaining()); 121 } 122 123 case CLOSING: 124 case CLOSED: 125 return false; 126 } 127 return false; 128 } 129 130 void WebSocketJob::Close() { 131 if (state_ == CLOSED) 132 return; 133 134 state_ = CLOSING; 135 if (current_send_buffer_.get()) { 136 // Will close in SendPending. 137 return; 138 } 139 state_ = CLOSED; 140 CloseInternal(); 141 } 142 143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) { 144 state_ = CONNECTING; 145 socket_->RestartWithAuth(credentials); 146 } 147 148 void WebSocketJob::DetachDelegate() { 149 state_ = CLOSED; 150 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 151 152 scoped_refptr<WebSocketJob> protect(this); 153 weak_ptr_factory_.InvalidateWeakPtrs(); 154 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs(); 155 156 delegate_ = NULL; 157 if (socket_.get()) 158 socket_->DetachDelegate(); 159 socket_ = NULL; 160 if (!callback_.is_null()) { 161 waiting_ = false; 162 callback_.Reset(); 163 Release(); // Balanced with OnStartOpenConnection(). 164 } 165 } 166 167 int WebSocketJob::OnStartOpenConnection( 168 SocketStream* socket, const CompletionCallback& callback) { 169 DCHECK(callback_.is_null()); 170 state_ = CONNECTING; 171 172 addresses_ = socket->address_list(); 173 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) { 174 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE; 175 } 176 177 if (delegate_) { 178 int result = delegate_->OnStartOpenConnection(socket, callback); 179 DCHECK_EQ(OK, result); 180 } 181 if (waiting_) { 182 // PutInQueue() may set |waiting_| true for throttling. In this case, 183 // Wakeup() will be called later. 184 callback_ = callback; 185 AddRef(); // Balanced when callback_ is cleared. 186 return ERR_IO_PENDING; 187 } 188 return TrySpdyStream(); 189 } 190 191 void WebSocketJob::OnConnected( 192 SocketStream* socket, int max_pending_send_allowed) { 193 if (state_ == CLOSED) 194 return; 195 DCHECK_EQ(CONNECTING, state_); 196 if (delegate_) 197 delegate_->OnConnected(socket, max_pending_send_allowed); 198 } 199 200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { 201 DCHECK_NE(INITIALIZED, state_); 202 DCHECK_GT(amount_sent, 0); 203 if (state_ == CLOSED) 204 return; 205 if (state_ == CONNECTING) { 206 OnSentHandshakeRequest(socket, amount_sent); 207 return; 208 } 209 if (delegate_) { 210 DCHECK(state_ == OPEN || state_ == CLOSING); 211 if (!current_send_buffer_.get()) { 212 VLOG(1) 213 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent; 214 return; 215 } 216 current_send_buffer_->DidConsume(amount_sent); 217 if (current_send_buffer_->BytesRemaining() > 0) 218 return; 219 220 // We need to report amount_sent of original buffer size, instead of 221 // amount sent to |socket|. 222 amount_sent = current_send_buffer_->size(); 223 DCHECK_GT(amount_sent, 0); 224 current_send_buffer_ = NULL; 225 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) { 226 base::MessageLoopForIO::current()->PostTask( 227 FROM_HERE, 228 base::Bind(&WebSocketJob::SendPending, 229 weak_ptr_factory_for_send_pending_.GetWeakPtr())); 230 } 231 delegate_->OnSentData(socket, amount_sent); 232 } 233 } 234 235 void WebSocketJob::OnReceivedData( 236 SocketStream* socket, const char* data, int len) { 237 DCHECK_NE(INITIALIZED, state_); 238 if (state_ == CLOSED) 239 return; 240 if (state_ == CONNECTING) { 241 OnReceivedHandshakeResponse(socket, data, len); 242 return; 243 } 244 DCHECK(state_ == OPEN || state_ == CLOSING); 245 if (delegate_ && len > 0) 246 delegate_->OnReceivedData(socket, data, len); 247 } 248 249 void WebSocketJob::OnClose(SocketStream* socket) { 250 state_ = CLOSED; 251 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 252 253 scoped_refptr<WebSocketJob> protect(this); 254 weak_ptr_factory_.InvalidateWeakPtrs(); 255 256 SocketStream::Delegate* delegate = delegate_; 257 delegate_ = NULL; 258 socket_ = NULL; 259 if (!callback_.is_null()) { 260 waiting_ = false; 261 callback_.Reset(); 262 Release(); // Balanced with OnStartOpenConnection(). 263 } 264 if (delegate) 265 delegate->OnClose(socket); 266 } 267 268 void WebSocketJob::OnAuthRequired( 269 SocketStream* socket, AuthChallengeInfo* auth_info) { 270 if (delegate_) 271 delegate_->OnAuthRequired(socket, auth_info); 272 } 273 274 void WebSocketJob::OnSSLCertificateError( 275 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) { 276 if (delegate_) 277 delegate_->OnSSLCertificateError(socket, ssl_info, fatal); 278 } 279 280 void WebSocketJob::OnError(const SocketStream* socket, int error) { 281 if (delegate_ && error != ERR_PROTOCOL_SWITCHED) 282 delegate_->OnError(socket, error); 283 } 284 285 void WebSocketJob::OnCreatedSpdyStream(int result) { 286 DCHECK(spdy_websocket_stream_.get()); 287 DCHECK(socket_.get()); 288 DCHECK_NE(ERR_IO_PENDING, result); 289 290 if (state_ == CLOSED) { 291 result = ERR_ABORTED; 292 } else if (result == OK) { 293 state_ = CONNECTING; 294 result = ERR_PROTOCOL_SWITCHED; 295 } else { 296 spdy_websocket_stream_.reset(); 297 } 298 299 CompleteIO(result); 300 } 301 302 void WebSocketJob::OnSentSpdyHeaders() { 303 DCHECK_NE(INITIALIZED, state_); 304 if (state_ != CONNECTING) 305 return; 306 if (delegate_) 307 delegate_->OnSentData(socket_.get(), handshake_request_->original_length()); 308 handshake_request_.reset(); 309 } 310 311 void WebSocketJob::OnSpdyResponseHeadersUpdated( 312 const SpdyHeaderBlock& response_headers) { 313 DCHECK_NE(INITIALIZED, state_); 314 if (state_ != CONNECTING) 315 return; 316 // TODO(toyoshim): Fallback to non-spdy connection? 317 handshake_response_->ParseResponseHeaderBlock(response_headers, 318 challenge_, 319 spdy_protocol_version_); 320 321 SaveCookiesAndNotifyHeadersComplete(); 322 } 323 324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) { 325 DCHECK_NE(INITIALIZED, state_); 326 DCHECK_NE(CONNECTING, state_); 327 if (state_ == CLOSED) 328 return; 329 if (!spdy_websocket_stream_.get()) 330 return; 331 OnSentData(socket_.get(), static_cast<int>(bytes_sent)); 332 } 333 334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) { 335 DCHECK_NE(INITIALIZED, state_); 336 DCHECK_NE(CONNECTING, state_); 337 if (state_ == CLOSED) 338 return; 339 if (!spdy_websocket_stream_.get()) 340 return; 341 if (buffer) { 342 OnReceivedData( 343 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize()); 344 } else { 345 OnReceivedData(socket_.get(), NULL, 0); 346 } 347 } 348 349 void WebSocketJob::OnCloseSpdyStream() { 350 spdy_websocket_stream_.reset(); 351 OnClose(socket_.get()); 352 } 353 354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 355 DCHECK_EQ(state_, CONNECTING); 356 if (started_to_send_handshake_request_) 357 return false; 358 if (!handshake_request_->ParseRequest(data, len)) 359 return false; 360 361 AddCookieHeaderAndSend(); 362 return true; 363 } 364 365 void WebSocketJob::AddCookieHeaderAndSend() { 366 bool allow = true; 367 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies())) 368 allow = false; 369 370 if (socket_.get() && delegate_ && state_ == CONNECTING) { 371 handshake_request_->RemoveHeaders(kCookieHeaders, 372 arraysize(kCookieHeaders)); 373 if (allow && socket_->context()->cookie_store()) { 374 // Add cookies, including HttpOnly cookies. 375 CookieOptions cookie_options; 376 cookie_options.set_include_httponly(); 377 socket_->context()->cookie_store()->GetCookiesWithOptionsAsync( 378 GetURLForCookies(), cookie_options, 379 base::Bind(&WebSocketJob::LoadCookieCallback, 380 weak_ptr_factory_.GetWeakPtr())); 381 } else { 382 DoSendData(); 383 } 384 } 385 } 386 387 void WebSocketJob::LoadCookieCallback(const std::string& cookie) { 388 if (!cookie.empty()) 389 // TODO(tyoshino): Sending cookie means that connection doesn't need 390 // kPrivacyModeEnabled as cookies may be server-bound and channel id 391 // wouldn't negatively affect privacy anyway. Need to restart connection 392 // or refactor to determine cookie status prior to connecting. 393 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 394 DoSendData(); 395 } 396 397 void WebSocketJob::DoSendData() { 398 if (spdy_websocket_stream_.get()) { 399 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); 400 handshake_request_->GetRequestHeaderBlock( 401 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_); 402 spdy_websocket_stream_->SendRequest(headers.Pass()); 403 } else { 404 const std::string& handshake_request = 405 handshake_request_->GetRawRequest(); 406 handshake_request_sent_ = 0; 407 socket_->net_log()->AddEvent( 408 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 409 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request)); 410 socket_->SendData(handshake_request.data(), 411 handshake_request.size()); 412 } 413 // Just buffered in |handshake_request_|. 414 started_to_send_handshake_request_ = true; 415 } 416 417 void WebSocketJob::OnSentHandshakeRequest( 418 SocketStream* socket, int amount_sent) { 419 DCHECK_EQ(state_, CONNECTING); 420 handshake_request_sent_ += amount_sent; 421 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 422 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 423 // handshake request has been sent. 424 // notify original size of handshake request to delegate. 425 if (delegate_) 426 delegate_->OnSentData( 427 socket, 428 handshake_request_->original_length()); 429 handshake_request_.reset(); 430 } 431 } 432 433 void WebSocketJob::OnReceivedHandshakeResponse( 434 SocketStream* socket, const char* data, int len) { 435 DCHECK_EQ(state_, CONNECTING); 436 if (handshake_response_->HasResponse()) { 437 // If we already has handshake response, received data should be frame 438 // data, not handshake message. 439 received_data_after_handshake_.insert( 440 received_data_after_handshake_.end(), data, data + len); 441 return; 442 } 443 444 size_t response_length = handshake_response_->ParseRawResponse(data, len); 445 if (!handshake_response_->HasResponse()) { 446 // not yet. we need more data. 447 return; 448 } 449 // handshake message is completed. 450 std::string raw_response = handshake_response_->GetRawResponse(); 451 socket_->net_log()->AddEvent( 452 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS, 453 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response)); 454 if (len - response_length > 0) { 455 // If we received extra data, it should be frame data. 456 DCHECK(received_data_after_handshake_.empty()); 457 received_data_after_handshake_.assign(data + response_length, data + len); 458 } 459 SaveCookiesAndNotifyHeadersComplete(); 460 } 461 462 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() { 463 // handshake message is completed. 464 DCHECK(handshake_response_->HasResponse()); 465 466 // Extract cookies from the handshake response into a temporary vector. 467 response_cookies_.clear(); 468 response_cookies_save_index_ = 0; 469 470 handshake_response_->GetHeaders( 471 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); 472 473 // Now, loop over the response cookies, and attempt to persist each. 474 SaveNextCookie(); 475 } 476 477 void WebSocketJob::NotifyHeadersComplete() { 478 // Remove cookie headers, with malformed headers preserved. 479 // Actual handshake should be done in Blink. 480 handshake_response_->RemoveHeaders( 481 kSetCookieHeaders, arraysize(kSetCookieHeaders)); 482 std::string handshake_response = handshake_response_->GetResponse(); 483 handshake_response_.reset(); 484 std::vector<char> received_data(handshake_response.begin(), 485 handshake_response.end()); 486 received_data.insert(received_data.end(), 487 received_data_after_handshake_.begin(), 488 received_data_after_handshake_.end()); 489 received_data_after_handshake_.clear(); 490 491 state_ = OPEN; 492 493 DCHECK(!received_data.empty()); 494 if (delegate_) 495 delegate_->OnReceivedData( 496 socket_.get(), &received_data.front(), received_data.size()); 497 498 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 499 } 500 501 void WebSocketJob::SaveNextCookie() { 502 if (!socket_.get() || !delegate_ || state_ != CONNECTING) 503 return; 504 505 callback_pending_ = false; 506 save_next_cookie_running_ = true; 507 508 if (socket_->context()->cookie_store()) { 509 GURL url_for_cookies = GetURLForCookies(); 510 511 CookieOptions options; 512 options.set_include_httponly(); 513 514 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since 515 // CookieMonster's asynchronous operation APIs queue the callback to run it 516 // on the thread where the API was called, there won't be race. I.e. unless 517 // the callback is run synchronously, it won't be run in parallel with this 518 // method. 519 while (!callback_pending_ && 520 response_cookies_save_index_ < response_cookies_.size()) { 521 std::string cookie = response_cookies_[response_cookies_save_index_]; 522 response_cookies_save_index_++; 523 524 if (!delegate_->CanSetCookie( 525 socket_.get(), url_for_cookies, cookie, &options)) 526 continue; 527 528 callback_pending_ = true; 529 socket_->context()->cookie_store()->SetCookieWithOptionsAsync( 530 url_for_cookies, cookie, options, 531 base::Bind(&WebSocketJob::OnCookieSaved, 532 weak_ptr_factory_.GetWeakPtr())); 533 } 534 } 535 536 save_next_cookie_running_ = false; 537 538 if (callback_pending_) 539 return; 540 541 response_cookies_.clear(); 542 response_cookies_save_index_ = 0; 543 544 NotifyHeadersComplete(); 545 } 546 547 void WebSocketJob::OnCookieSaved(bool cookie_status) { 548 // Tell the caller of SetCookieWithOptionsAsync() that this completion 549 // callback is invoked. 550 // - If the caller checks callback_pending earlier than this callback, the 551 // caller exits to let this method continue iteration. 552 // - Otherwise, the caller continues iteration. 553 callback_pending_ = false; 554 555 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited 556 // the loop. Otherwise, return. 557 if (save_next_cookie_running_) 558 return; 559 560 SaveNextCookie(); 561 } 562 563 GURL WebSocketJob::GetURLForCookies() const { 564 GURL url = socket_->url(); 565 std::string scheme = socket_->is_secure() ? "https" : "http"; 566 url_canon::Replacements<char> replacements; 567 replacements.SetScheme(scheme.c_str(), 568 url_parse::Component(0, scheme.length())); 569 return url.ReplaceComponents(replacements); 570 } 571 572 const AddressList& WebSocketJob::address_list() const { 573 return addresses_; 574 } 575 576 int WebSocketJob::TrySpdyStream() { 577 if (!socket_.get()) 578 return ERR_FAILED; 579 580 if (!websocket_over_spdy_enabled_) 581 return OK; 582 583 // Check if we have a SPDY session available. 584 HttpTransactionFactory* factory = 585 socket_->context()->http_transaction_factory(); 586 if (!factory) 587 return OK; 588 scoped_refptr<HttpNetworkSession> session = factory->GetSession(); 589 if (!session.get()) 590 return OK; 591 SpdySessionPool* spdy_pool = session->spdy_session_pool(); 592 PrivacyMode privacy_mode = socket_->privacy_mode(); 593 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()), 594 socket_->proxy_server(), privacy_mode); 595 // Forbid wss downgrade to SPDY without SSL. 596 // TODO(toyoshim): Does it realize the same policy with HTTP? 597 base::WeakPtr<SpdySession> spdy_session = 598 spdy_pool->FindAvailableSession(key, *socket_->net_log()); 599 if (!spdy_session) 600 return OK; 601 602 SSLInfo ssl_info; 603 bool was_npn_negotiated; 604 NextProto protocol_negotiated = kProtoUnknown; 605 bool use_ssl = spdy_session->GetSSLInfo( 606 &ssl_info, &was_npn_negotiated, &protocol_negotiated); 607 if (socket_->is_secure() && !use_ssl) 608 return OK; 609 610 // Create SpdyWebSocketStream. 611 spdy_protocol_version_ = spdy_session->GetProtocolVersion(); 612 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this)); 613 614 int result = spdy_websocket_stream_->InitializeStream( 615 socket_->url(), MEDIUM, *socket_->net_log()); 616 if (result == OK) { 617 OnConnected(socket_.get(), kMaxPendingSendAllowed); 618 return ERR_PROTOCOL_SWITCHED; 619 } 620 if (result != ERR_IO_PENDING) { 621 spdy_websocket_stream_.reset(); 622 return OK; 623 } 624 625 return ERR_IO_PENDING; 626 } 627 628 void WebSocketJob::SetWaiting() { 629 waiting_ = true; 630 } 631 632 bool WebSocketJob::IsWaiting() const { 633 return waiting_; 634 } 635 636 void WebSocketJob::Wakeup() { 637 if (!waiting_) 638 return; 639 waiting_ = false; 640 DCHECK(!callback_.is_null()); 641 base::MessageLoopForIO::current()->PostTask( 642 FROM_HERE, 643 base::Bind(&WebSocketJob::RetryPendingIO, 644 weak_ptr_factory_.GetWeakPtr())); 645 } 646 647 void WebSocketJob::RetryPendingIO() { 648 int result = TrySpdyStream(); 649 650 // In the case of ERR_IO_PENDING, CompleteIO() will be called from 651 // OnCreatedSpdyStream(). 652 if (result != ERR_IO_PENDING) 653 CompleteIO(result); 654 } 655 656 void WebSocketJob::CompleteIO(int result) { 657 // |callback_| may be null if OnClose() or DetachDelegate() was called. 658 if (!callback_.is_null()) { 659 CompletionCallback callback = callback_; 660 callback_.Reset(); 661 callback.Run(result); 662 Release(); // Balanced with OnStartOpenConnection(). 663 } 664 } 665 666 bool WebSocketJob::SendDataInternal(const char* data, int length) { 667 if (spdy_websocket_stream_.get()) 668 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length); 669 if (socket_.get()) 670 return socket_->SendData(data, length); 671 return false; 672 } 673 674 void WebSocketJob::CloseInternal() { 675 if (spdy_websocket_stream_.get()) 676 spdy_websocket_stream_->Close(); 677 if (socket_.get()) 678 socket_->Close(); 679 } 680 681 void WebSocketJob::SendPending() { 682 if (current_send_buffer_.get()) 683 return; 684 685 // Current buffer has been sent. Try next if any. 686 if (send_buffer_queue_.empty()) { 687 // No more data to send. 688 if (state_ == CLOSING) 689 CloseInternal(); 690 return; 691 } 692 693 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front(); 694 send_buffer_queue_.pop_front(); 695 current_send_buffer_ = 696 new DrainableIOBuffer(next_buffer.get(), next_buffer->size()); 697 SendDataInternal(current_send_buffer_->data(), 698 current_send_buffer_->BytesRemaining()); 699 } 700 701 } // namespace net 702