Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_job.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/bind.h"
     10 #include "base/lazy_instance.h"
     11 #include "net/base/io_buffer.h"
     12 #include "net/base/net_errors.h"
     13 #include "net/base/net_log.h"
     14 #include "net/cookies/cookie_store.h"
     15 #include "net/http/http_network_session.h"
     16 #include "net/http/http_transaction_factory.h"
     17 #include "net/http/http_util.h"
     18 #include "net/spdy/spdy_session.h"
     19 #include "net/spdy/spdy_session_pool.h"
     20 #include "net/url_request/url_request_context.h"
     21 #include "net/websockets/websocket_handshake_handler.h"
     22 #include "net/websockets/websocket_net_log_params.h"
     23 #include "net/websockets/websocket_throttle.h"
     24 #include "url/gurl.h"
     25 
     26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
     27 
     28 namespace {
     29 
     30 // lower-case header names.
     31 const char* const kCookieHeaders[] = {
     32   "cookie", "cookie2"
     33 };
     34 const char* const kSetCookieHeaders[] = {
     35   "set-cookie", "set-cookie2"
     36 };
     37 
     38 net::SocketStreamJob* WebSocketJobFactory(
     39     const GURL& url, net::SocketStream::Delegate* delegate,
     40     net::URLRequestContext* context, net::CookieStore* cookie_store) {
     41   net::WebSocketJob* job = new net::WebSocketJob(delegate);
     42   job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
     43   return job;
     44 }
     45 
     46 class WebSocketJobInitSingleton {
     47  private:
     48   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
     49   WebSocketJobInitSingleton() {
     50     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
     51     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
     52   }
     53 };
     54 
     55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
     56     LAZY_INSTANCE_INITIALIZER;
     57 
     58 }  // anonymous namespace
     59 
     60 namespace net {
     61 
     62 // static
     63 void WebSocketJob::EnsureInit() {
     64   g_websocket_job_init.Get();
     65 }
     66 
     67 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
     68     : delegate_(delegate),
     69       state_(INITIALIZED),
     70       waiting_(false),
     71       handshake_request_(new WebSocketHandshakeRequestHandler),
     72       handshake_response_(new WebSocketHandshakeResponseHandler),
     73       started_to_send_handshake_request_(false),
     74       handshake_request_sent_(0),
     75       response_cookies_save_index_(0),
     76       spdy_protocol_version_(0),
     77       save_next_cookie_running_(false),
     78       callback_pending_(false),
     79       weak_ptr_factory_(this),
     80       weak_ptr_factory_for_send_pending_(this) {
     81 }
     82 
     83 WebSocketJob::~WebSocketJob() {
     84   DCHECK_EQ(CLOSED, state_);
     85   DCHECK(!delegate_);
     86   DCHECK(!socket_.get());
     87 }
     88 
     89 void WebSocketJob::Connect() {
     90   DCHECK(socket_.get());
     91   DCHECK_EQ(state_, INITIALIZED);
     92   state_ = CONNECTING;
     93   socket_->Connect();
     94 }
     95 
     96 bool WebSocketJob::SendData(const char* data, int len) {
     97   switch (state_) {
     98     case INITIALIZED:
     99       return false;
    100 
    101     case CONNECTING:
    102       return SendHandshakeRequest(data, len);
    103 
    104     case OPEN:
    105       {
    106         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
    107         memcpy(buffer->data(), data, len);
    108         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
    109           send_buffer_queue_.push_back(buffer);
    110           return true;
    111         }
    112         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
    113         return SendDataInternal(current_send_buffer_->data(),
    114                                 current_send_buffer_->BytesRemaining());
    115       }
    116 
    117     case CLOSING:
    118     case CLOSED:
    119       return false;
    120   }
    121   return false;
    122 }
    123 
    124 void WebSocketJob::Close() {
    125   if (state_ == CLOSED)
    126     return;
    127 
    128   state_ = CLOSING;
    129   if (current_send_buffer_.get()) {
    130     // Will close in SendPending.
    131     return;
    132   }
    133   state_ = CLOSED;
    134   CloseInternal();
    135 }
    136 
    137 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
    138   state_ = CONNECTING;
    139   socket_->RestartWithAuth(credentials);
    140 }
    141 
    142 void WebSocketJob::DetachDelegate() {
    143   state_ = CLOSED;
    144   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    145 
    146   scoped_refptr<WebSocketJob> protect(this);
    147   weak_ptr_factory_.InvalidateWeakPtrs();
    148   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
    149 
    150   delegate_ = NULL;
    151   if (socket_.get())
    152     socket_->DetachDelegate();
    153   socket_ = NULL;
    154   if (!callback_.is_null()) {
    155     waiting_ = false;
    156     callback_.Reset();
    157     Release();  // Balanced with OnStartOpenConnection().
    158   }
    159 }
    160 
    161 int WebSocketJob::OnStartOpenConnection(
    162     SocketStream* socket, const CompletionCallback& callback) {
    163   DCHECK(callback_.is_null());
    164   state_ = CONNECTING;
    165 
    166   addresses_ = socket->address_list();
    167   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
    168     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
    169   }
    170 
    171   if (delegate_) {
    172     int result = delegate_->OnStartOpenConnection(socket, callback);
    173     DCHECK_EQ(OK, result);
    174   }
    175   if (waiting_) {
    176     // PutInQueue() may set |waiting_| true for throttling. In this case,
    177     // Wakeup() will be called later.
    178     callback_ = callback;
    179     AddRef();  // Balanced when callback_ is cleared.
    180     return ERR_IO_PENDING;
    181   }
    182   return TrySpdyStream();
    183 }
    184 
    185 void WebSocketJob::OnConnected(
    186     SocketStream* socket, int max_pending_send_allowed) {
    187   if (state_ == CLOSED)
    188     return;
    189   DCHECK_EQ(CONNECTING, state_);
    190   if (delegate_)
    191     delegate_->OnConnected(socket, max_pending_send_allowed);
    192 }
    193 
    194 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
    195   DCHECK_NE(INITIALIZED, state_);
    196   DCHECK_GT(amount_sent, 0);
    197   if (state_ == CLOSED)
    198     return;
    199   if (state_ == CONNECTING) {
    200     OnSentHandshakeRequest(socket, amount_sent);
    201     return;
    202   }
    203   if (delegate_) {
    204     DCHECK(state_ == OPEN || state_ == CLOSING);
    205     if (!current_send_buffer_.get()) {
    206       VLOG(1)
    207           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
    208       return;
    209     }
    210     current_send_buffer_->DidConsume(amount_sent);
    211     if (current_send_buffer_->BytesRemaining() > 0)
    212       return;
    213 
    214     // We need to report amount_sent of original buffer size, instead of
    215     // amount sent to |socket|.
    216     amount_sent = current_send_buffer_->size();
    217     DCHECK_GT(amount_sent, 0);
    218     current_send_buffer_ = NULL;
    219     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
    220       base::MessageLoopForIO::current()->PostTask(
    221           FROM_HERE,
    222           base::Bind(&WebSocketJob::SendPending,
    223                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
    224     }
    225     delegate_->OnSentData(socket, amount_sent);
    226   }
    227 }
    228 
    229 void WebSocketJob::OnReceivedData(
    230     SocketStream* socket, const char* data, int len) {
    231   DCHECK_NE(INITIALIZED, state_);
    232   if (state_ == CLOSED)
    233     return;
    234   if (state_ == CONNECTING) {
    235     OnReceivedHandshakeResponse(socket, data, len);
    236     return;
    237   }
    238   DCHECK(state_ == OPEN || state_ == CLOSING);
    239   if (delegate_ && len > 0)
    240     delegate_->OnReceivedData(socket, data, len);
    241 }
    242 
    243 void WebSocketJob::OnClose(SocketStream* socket) {
    244   state_ = CLOSED;
    245   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    246 
    247   scoped_refptr<WebSocketJob> protect(this);
    248   weak_ptr_factory_.InvalidateWeakPtrs();
    249 
    250   SocketStream::Delegate* delegate = delegate_;
    251   delegate_ = NULL;
    252   socket_ = NULL;
    253   if (!callback_.is_null()) {
    254     waiting_ = false;
    255     callback_.Reset();
    256     Release();  // Balanced with OnStartOpenConnection().
    257   }
    258   if (delegate)
    259     delegate->OnClose(socket);
    260 }
    261 
    262 void WebSocketJob::OnAuthRequired(
    263     SocketStream* socket, AuthChallengeInfo* auth_info) {
    264   if (delegate_)
    265     delegate_->OnAuthRequired(socket, auth_info);
    266 }
    267 
    268 void WebSocketJob::OnSSLCertificateError(
    269     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
    270   if (delegate_)
    271     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
    272 }
    273 
    274 void WebSocketJob::OnError(const SocketStream* socket, int error) {
    275   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
    276     delegate_->OnError(socket, error);
    277 }
    278 
    279 void WebSocketJob::OnCreatedSpdyStream(int result) {
    280   DCHECK(spdy_websocket_stream_.get());
    281   DCHECK(socket_.get());
    282   DCHECK_NE(ERR_IO_PENDING, result);
    283 
    284   if (state_ == CLOSED) {
    285     result = ERR_ABORTED;
    286   } else if (result == OK) {
    287     state_ = CONNECTING;
    288     result = ERR_PROTOCOL_SWITCHED;
    289   } else {
    290     spdy_websocket_stream_.reset();
    291   }
    292 
    293   CompleteIO(result);
    294 }
    295 
    296 void WebSocketJob::OnSentSpdyHeaders() {
    297   DCHECK_NE(INITIALIZED, state_);
    298   if (state_ != CONNECTING)
    299     return;
    300   size_t original_length = handshake_request_->original_length();
    301   handshake_request_.reset();
    302   if (delegate_)
    303     delegate_->OnSentData(socket_.get(), original_length);
    304 }
    305 
    306 void WebSocketJob::OnSpdyResponseHeadersUpdated(
    307     const SpdyHeaderBlock& response_headers) {
    308   DCHECK_NE(INITIALIZED, state_);
    309   if (state_ != CONNECTING)
    310     return;
    311   // TODO(toyoshim): Fallback to non-spdy connection?
    312   handshake_response_->ParseResponseHeaderBlock(response_headers,
    313                                                 challenge_,
    314                                                 spdy_protocol_version_);
    315 
    316   SaveCookiesAndNotifyHeadersComplete();
    317 }
    318 
    319 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
    320   DCHECK_NE(INITIALIZED, state_);
    321   DCHECK_NE(CONNECTING, state_);
    322   if (state_ == CLOSED)
    323     return;
    324   if (!spdy_websocket_stream_.get())
    325     return;
    326   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
    327 }
    328 
    329 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
    330   DCHECK_NE(INITIALIZED, state_);
    331   DCHECK_NE(CONNECTING, state_);
    332   if (state_ == CLOSED)
    333     return;
    334   if (!spdy_websocket_stream_.get())
    335     return;
    336   if (buffer) {
    337     OnReceivedData(
    338         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
    339   } else {
    340     OnReceivedData(socket_.get(), NULL, 0);
    341   }
    342 }
    343 
    344 void WebSocketJob::OnCloseSpdyStream() {
    345   spdy_websocket_stream_.reset();
    346   OnClose(socket_.get());
    347 }
    348 
    349 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
    350   DCHECK_EQ(state_, CONNECTING);
    351   if (started_to_send_handshake_request_)
    352     return false;
    353   if (!handshake_request_->ParseRequest(data, len))
    354     return false;
    355 
    356   AddCookieHeaderAndSend();
    357   return true;
    358 }
    359 
    360 void WebSocketJob::AddCookieHeaderAndSend() {
    361   bool allow = true;
    362   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
    363     allow = false;
    364 
    365   if (socket_.get() && delegate_ && state_ == CONNECTING) {
    366     handshake_request_->RemoveHeaders(kCookieHeaders,
    367                                       arraysize(kCookieHeaders));
    368     if (allow && socket_->cookie_store()) {
    369       // Add cookies, including HttpOnly cookies.
    370       CookieOptions cookie_options;
    371       cookie_options.set_include_httponly();
    372       socket_->cookie_store()->GetCookiesWithOptionsAsync(
    373           GetURLForCookies(), cookie_options,
    374           base::Bind(&WebSocketJob::LoadCookieCallback,
    375                      weak_ptr_factory_.GetWeakPtr()));
    376     } else {
    377       DoSendData();
    378     }
    379   }
    380 }
    381 
    382 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
    383   if (!cookie.empty())
    384     // TODO(tyoshino): Sending cookie means that connection doesn't need
    385     // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
    386     // wouldn't negatively affect privacy anyway. Need to restart connection
    387     // or refactor to determine cookie status prior to connecting.
    388     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
    389   DoSendData();
    390 }
    391 
    392 void WebSocketJob::DoSendData() {
    393   if (spdy_websocket_stream_.get()) {
    394     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    395     handshake_request_->GetRequestHeaderBlock(
    396         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
    397     spdy_websocket_stream_->SendRequest(headers.Pass());
    398   } else {
    399     const std::string& handshake_request =
    400         handshake_request_->GetRawRequest();
    401     handshake_request_sent_ = 0;
    402     socket_->net_log()->AddEvent(
    403         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
    404         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
    405     socket_->SendData(handshake_request.data(),
    406                       handshake_request.size());
    407   }
    408   // Just buffered in |handshake_request_|.
    409   started_to_send_handshake_request_ = true;
    410 }
    411 
    412 void WebSocketJob::OnSentHandshakeRequest(
    413     SocketStream* socket, int amount_sent) {
    414   DCHECK_EQ(state_, CONNECTING);
    415   handshake_request_sent_ += amount_sent;
    416   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
    417   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
    418     // handshake request has been sent.
    419     // notify original size of handshake request to delegate.
    420     // Reset the handshake_request_ first in case this object is deleted by the
    421     // delegate.
    422     size_t original_length = handshake_request_->original_length();
    423     handshake_request_.reset();
    424     if (delegate_)
    425       delegate_->OnSentData(socket, original_length);
    426   }
    427 }
    428 
    429 void WebSocketJob::OnReceivedHandshakeResponse(
    430     SocketStream* socket, const char* data, int len) {
    431   DCHECK_EQ(state_, CONNECTING);
    432   if (handshake_response_->HasResponse()) {
    433     // If we already has handshake response, received data should be frame
    434     // data, not handshake message.
    435     received_data_after_handshake_.insert(
    436         received_data_after_handshake_.end(), data, data + len);
    437     return;
    438   }
    439 
    440   size_t response_length = handshake_response_->ParseRawResponse(data, len);
    441   if (!handshake_response_->HasResponse()) {
    442     // not yet. we need more data.
    443     return;
    444   }
    445   // handshake message is completed.
    446   std::string raw_response = handshake_response_->GetRawResponse();
    447   socket_->net_log()->AddEvent(
    448       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
    449       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
    450   if (len - response_length > 0) {
    451     // If we received extra data, it should be frame data.
    452     DCHECK(received_data_after_handshake_.empty());
    453     received_data_after_handshake_.assign(data + response_length, data + len);
    454   }
    455   SaveCookiesAndNotifyHeadersComplete();
    456 }
    457 
    458 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
    459   // handshake message is completed.
    460   DCHECK(handshake_response_->HasResponse());
    461 
    462   // Extract cookies from the handshake response into a temporary vector.
    463   response_cookies_.clear();
    464   response_cookies_save_index_ = 0;
    465 
    466   handshake_response_->GetHeaders(
    467       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
    468 
    469   // Now, loop over the response cookies, and attempt to persist each.
    470   SaveNextCookie();
    471 }
    472 
    473 void WebSocketJob::NotifyHeadersComplete() {
    474   // Remove cookie headers, with malformed headers preserved.
    475   // Actual handshake should be done in Blink.
    476   handshake_response_->RemoveHeaders(
    477       kSetCookieHeaders, arraysize(kSetCookieHeaders));
    478   std::string handshake_response = handshake_response_->GetResponse();
    479   handshake_response_.reset();
    480   std::vector<char> received_data(handshake_response.begin(),
    481                                   handshake_response.end());
    482   received_data.insert(received_data.end(),
    483                        received_data_after_handshake_.begin(),
    484                        received_data_after_handshake_.end());
    485   received_data_after_handshake_.clear();
    486 
    487   state_ = OPEN;
    488 
    489   DCHECK(!received_data.empty());
    490   if (delegate_)
    491     delegate_->OnReceivedData(
    492         socket_.get(), &received_data.front(), received_data.size());
    493 
    494   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    495 }
    496 
    497 void WebSocketJob::SaveNextCookie() {
    498   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
    499     return;
    500 
    501   callback_pending_ = false;
    502   save_next_cookie_running_ = true;
    503 
    504   if (socket_->cookie_store()) {
    505     GURL url_for_cookies = GetURLForCookies();
    506 
    507     CookieOptions options;
    508     options.set_include_httponly();
    509 
    510     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
    511     // CookieMonster's asynchronous operation APIs queue the callback to run it
    512     // on the thread where the API was called, there won't be race. I.e. unless
    513     // the callback is run synchronously, it won't be run in parallel with this
    514     // method.
    515     while (!callback_pending_ &&
    516            response_cookies_save_index_ < response_cookies_.size()) {
    517       std::string cookie = response_cookies_[response_cookies_save_index_];
    518       response_cookies_save_index_++;
    519 
    520       if (!delegate_->CanSetCookie(
    521               socket_.get(), url_for_cookies, cookie, &options))
    522         continue;
    523 
    524       callback_pending_ = true;
    525       socket_->cookie_store()->SetCookieWithOptionsAsync(
    526           url_for_cookies, cookie, options,
    527           base::Bind(&WebSocketJob::OnCookieSaved,
    528                      weak_ptr_factory_.GetWeakPtr()));
    529     }
    530   }
    531 
    532   save_next_cookie_running_ = false;
    533 
    534   if (callback_pending_)
    535     return;
    536 
    537   response_cookies_.clear();
    538   response_cookies_save_index_ = 0;
    539 
    540   NotifyHeadersComplete();
    541 }
    542 
    543 void WebSocketJob::OnCookieSaved(bool cookie_status) {
    544   // Tell the caller of SetCookieWithOptionsAsync() that this completion
    545   // callback is invoked.
    546   // - If the caller checks callback_pending earlier than this callback, the
    547   //   caller exits to let this method continue iteration.
    548   // - Otherwise, the caller continues iteration.
    549   callback_pending_ = false;
    550 
    551   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
    552   // the loop. Otherwise, return.
    553   if (save_next_cookie_running_)
    554     return;
    555 
    556   SaveNextCookie();
    557 }
    558 
    559 GURL WebSocketJob::GetURLForCookies() const {
    560   GURL url = socket_->url();
    561   std::string scheme = socket_->is_secure() ? "https" : "http";
    562   url::Replacements<char> replacements;
    563   replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
    564   return url.ReplaceComponents(replacements);
    565 }
    566 
    567 const AddressList& WebSocketJob::address_list() const {
    568   return addresses_;
    569 }
    570 
    571 int WebSocketJob::TrySpdyStream() {
    572   if (!socket_.get())
    573     return ERR_FAILED;
    574 
    575   // Check if we have a SPDY session available.
    576   HttpTransactionFactory* factory =
    577       socket_->context()->http_transaction_factory();
    578   if (!factory)
    579     return OK;
    580   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
    581   if (!session.get() || !session->params().enable_websocket_over_spdy)
    582     return OK;
    583   SpdySessionPool* spdy_pool = session->spdy_session_pool();
    584   PrivacyMode privacy_mode = socket_->privacy_mode();
    585   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
    586                            socket_->proxy_server(), privacy_mode);
    587   // Forbid wss downgrade to SPDY without SSL.
    588   // TODO(toyoshim): Does it realize the same policy with HTTP?
    589   base::WeakPtr<SpdySession> spdy_session =
    590       spdy_pool->FindAvailableSession(key, *socket_->net_log());
    591   if (!spdy_session)
    592     return OK;
    593 
    594   SSLInfo ssl_info;
    595   bool was_npn_negotiated;
    596   NextProto protocol_negotiated = kProtoUnknown;
    597   bool use_ssl = spdy_session->GetSSLInfo(
    598       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
    599   if (socket_->is_secure() && !use_ssl)
    600     return OK;
    601 
    602   // Create SpdyWebSocketStream.
    603   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
    604   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
    605 
    606   int result = spdy_websocket_stream_->InitializeStream(
    607       socket_->url(), MEDIUM, *socket_->net_log());
    608   if (result == OK) {
    609     OnConnected(socket_.get(), kMaxPendingSendAllowed);
    610     return ERR_PROTOCOL_SWITCHED;
    611   }
    612   if (result != ERR_IO_PENDING) {
    613     spdy_websocket_stream_.reset();
    614     return OK;
    615   }
    616 
    617   return ERR_IO_PENDING;
    618 }
    619 
    620 void WebSocketJob::SetWaiting() {
    621   waiting_ = true;
    622 }
    623 
    624 bool WebSocketJob::IsWaiting() const {
    625   return waiting_;
    626 }
    627 
    628 void WebSocketJob::Wakeup() {
    629   if (!waiting_)
    630     return;
    631   waiting_ = false;
    632   DCHECK(!callback_.is_null());
    633   base::MessageLoopForIO::current()->PostTask(
    634       FROM_HERE,
    635       base::Bind(&WebSocketJob::RetryPendingIO,
    636                  weak_ptr_factory_.GetWeakPtr()));
    637 }
    638 
    639 void WebSocketJob::RetryPendingIO() {
    640   int result = TrySpdyStream();
    641 
    642   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
    643   // OnCreatedSpdyStream().
    644   if (result != ERR_IO_PENDING)
    645     CompleteIO(result);
    646 }
    647 
    648 void WebSocketJob::CompleteIO(int result) {
    649   // |callback_| may be null if OnClose() or DetachDelegate() was called.
    650   if (!callback_.is_null()) {
    651     CompletionCallback callback = callback_;
    652     callback_.Reset();
    653     callback.Run(result);
    654     Release();  // Balanced with OnStartOpenConnection().
    655   }
    656 }
    657 
    658 bool WebSocketJob::SendDataInternal(const char* data, int length) {
    659   if (spdy_websocket_stream_.get())
    660     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
    661   if (socket_.get())
    662     return socket_->SendData(data, length);
    663   return false;
    664 }
    665 
    666 void WebSocketJob::CloseInternal() {
    667   if (spdy_websocket_stream_.get())
    668     spdy_websocket_stream_->Close();
    669   if (socket_.get())
    670     socket_->Close();
    671 }
    672 
    673 void WebSocketJob::SendPending() {
    674   if (current_send_buffer_.get())
    675     return;
    676 
    677   // Current buffer has been sent. Try next if any.
    678   if (send_buffer_queue_.empty()) {
    679     // No more data to send.
    680     if (state_ == CLOSING)
    681       CloseInternal();
    682     return;
    683   }
    684 
    685   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
    686   send_buffer_queue_.pop_front();
    687   current_send_buffer_ =
    688       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
    689   SendDataInternal(current_send_buffer_->data(),
    690                    current_send_buffer_->BytesRemaining());
    691 }
    692 
    693 }  // namespace net
    694