Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_job.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/bind.h"
     10 #include "base/lazy_instance.h"
     11 #include "net/base/io_buffer.h"
     12 #include "net/base/net_errors.h"
     13 #include "net/base/net_log.h"
     14 #include "net/cookies/cookie_store.h"
     15 #include "net/http/http_network_session.h"
     16 #include "net/http/http_transaction_factory.h"
     17 #include "net/http/http_util.h"
     18 #include "net/spdy/spdy_session.h"
     19 #include "net/spdy/spdy_session_pool.h"
     20 #include "net/url_request/url_request_context.h"
     21 #include "net/websockets/websocket_handshake_handler.h"
     22 #include "net/websockets/websocket_net_log_params.h"
     23 #include "net/websockets/websocket_throttle.h"
     24 #include "url/gurl.h"
     25 
     26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
     27 
     28 namespace {
     29 
     30 // lower-case header names.
     31 const char* const kCookieHeaders[] = {
     32   "cookie", "cookie2"
     33 };
     34 const char* const kSetCookieHeaders[] = {
     35   "set-cookie", "set-cookie2"
     36 };
     37 
     38 net::SocketStreamJob* WebSocketJobFactory(
     39     const GURL& url, net::SocketStream::Delegate* delegate) {
     40   net::WebSocketJob* job = new net::WebSocketJob(delegate);
     41   job->InitSocketStream(new net::SocketStream(url, job));
     42   return job;
     43 }
     44 
     45 class WebSocketJobInitSingleton {
     46  private:
     47   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
     48   WebSocketJobInitSingleton() {
     49     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
     50     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
     51   }
     52 };
     53 
     54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
     55     LAZY_INSTANCE_INITIALIZER;
     56 
     57 }  // anonymous namespace
     58 
     59 namespace net {
     60 
     61 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
     62 
     63 // static
     64 void WebSocketJob::EnsureInit() {
     65   g_websocket_job_init.Get();
     66 }
     67 
     68 // static
     69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
     70   websocket_over_spdy_enabled_ = enabled;
     71 }
     72 
     73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
     74     : delegate_(delegate),
     75       state_(INITIALIZED),
     76       waiting_(false),
     77       handshake_request_(new WebSocketHandshakeRequestHandler),
     78       handshake_response_(new WebSocketHandshakeResponseHandler),
     79       started_to_send_handshake_request_(false),
     80       handshake_request_sent_(0),
     81       response_cookies_save_index_(0),
     82       spdy_protocol_version_(0),
     83       save_next_cookie_running_(false),
     84       callback_pending_(false),
     85       weak_ptr_factory_(this),
     86       weak_ptr_factory_for_send_pending_(this) {
     87 }
     88 
     89 WebSocketJob::~WebSocketJob() {
     90   DCHECK_EQ(CLOSED, state_);
     91   DCHECK(!delegate_);
     92   DCHECK(!socket_.get());
     93 }
     94 
     95 void WebSocketJob::Connect() {
     96   DCHECK(socket_.get());
     97   DCHECK_EQ(state_, INITIALIZED);
     98   state_ = CONNECTING;
     99   socket_->Connect();
    100 }
    101 
    102 bool WebSocketJob::SendData(const char* data, int len) {
    103   switch (state_) {
    104     case INITIALIZED:
    105       return false;
    106 
    107     case CONNECTING:
    108       return SendHandshakeRequest(data, len);
    109 
    110     case OPEN:
    111       {
    112         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
    113         memcpy(buffer->data(), data, len);
    114         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
    115           send_buffer_queue_.push_back(buffer);
    116           return true;
    117         }
    118         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
    119         return SendDataInternal(current_send_buffer_->data(),
    120                                 current_send_buffer_->BytesRemaining());
    121       }
    122 
    123     case CLOSING:
    124     case CLOSED:
    125       return false;
    126   }
    127   return false;
    128 }
    129 
    130 void WebSocketJob::Close() {
    131   if (state_ == CLOSED)
    132     return;
    133 
    134   state_ = CLOSING;
    135   if (current_send_buffer_.get()) {
    136     // Will close in SendPending.
    137     return;
    138   }
    139   state_ = CLOSED;
    140   CloseInternal();
    141 }
    142 
    143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
    144   state_ = CONNECTING;
    145   socket_->RestartWithAuth(credentials);
    146 }
    147 
    148 void WebSocketJob::DetachDelegate() {
    149   state_ = CLOSED;
    150   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    151 
    152   scoped_refptr<WebSocketJob> protect(this);
    153   weak_ptr_factory_.InvalidateWeakPtrs();
    154   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
    155 
    156   delegate_ = NULL;
    157   if (socket_.get())
    158     socket_->DetachDelegate();
    159   socket_ = NULL;
    160   if (!callback_.is_null()) {
    161     waiting_ = false;
    162     callback_.Reset();
    163     Release();  // Balanced with OnStartOpenConnection().
    164   }
    165 }
    166 
    167 int WebSocketJob::OnStartOpenConnection(
    168     SocketStream* socket, const CompletionCallback& callback) {
    169   DCHECK(callback_.is_null());
    170   state_ = CONNECTING;
    171 
    172   addresses_ = socket->address_list();
    173   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
    174     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
    175   }
    176 
    177   if (delegate_) {
    178     int result = delegate_->OnStartOpenConnection(socket, callback);
    179     DCHECK_EQ(OK, result);
    180   }
    181   if (waiting_) {
    182     // PutInQueue() may set |waiting_| true for throttling. In this case,
    183     // Wakeup() will be called later.
    184     callback_ = callback;
    185     AddRef();  // Balanced when callback_ is cleared.
    186     return ERR_IO_PENDING;
    187   }
    188   return TrySpdyStream();
    189 }
    190 
    191 void WebSocketJob::OnConnected(
    192     SocketStream* socket, int max_pending_send_allowed) {
    193   if (state_ == CLOSED)
    194     return;
    195   DCHECK_EQ(CONNECTING, state_);
    196   if (delegate_)
    197     delegate_->OnConnected(socket, max_pending_send_allowed);
    198 }
    199 
    200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
    201   DCHECK_NE(INITIALIZED, state_);
    202   DCHECK_GT(amount_sent, 0);
    203   if (state_ == CLOSED)
    204     return;
    205   if (state_ == CONNECTING) {
    206     OnSentHandshakeRequest(socket, amount_sent);
    207     return;
    208   }
    209   if (delegate_) {
    210     DCHECK(state_ == OPEN || state_ == CLOSING);
    211     if (!current_send_buffer_.get()) {
    212       VLOG(1)
    213           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
    214       return;
    215     }
    216     current_send_buffer_->DidConsume(amount_sent);
    217     if (current_send_buffer_->BytesRemaining() > 0)
    218       return;
    219 
    220     // We need to report amount_sent of original buffer size, instead of
    221     // amount sent to |socket|.
    222     amount_sent = current_send_buffer_->size();
    223     DCHECK_GT(amount_sent, 0);
    224     current_send_buffer_ = NULL;
    225     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
    226       base::MessageLoopForIO::current()->PostTask(
    227           FROM_HERE,
    228           base::Bind(&WebSocketJob::SendPending,
    229                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
    230     }
    231     delegate_->OnSentData(socket, amount_sent);
    232   }
    233 }
    234 
    235 void WebSocketJob::OnReceivedData(
    236     SocketStream* socket, const char* data, int len) {
    237   DCHECK_NE(INITIALIZED, state_);
    238   if (state_ == CLOSED)
    239     return;
    240   if (state_ == CONNECTING) {
    241     OnReceivedHandshakeResponse(socket, data, len);
    242     return;
    243   }
    244   DCHECK(state_ == OPEN || state_ == CLOSING);
    245   if (delegate_ && len > 0)
    246     delegate_->OnReceivedData(socket, data, len);
    247 }
    248 
    249 void WebSocketJob::OnClose(SocketStream* socket) {
    250   state_ = CLOSED;
    251   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    252 
    253   scoped_refptr<WebSocketJob> protect(this);
    254   weak_ptr_factory_.InvalidateWeakPtrs();
    255 
    256   SocketStream::Delegate* delegate = delegate_;
    257   delegate_ = NULL;
    258   socket_ = NULL;
    259   if (!callback_.is_null()) {
    260     waiting_ = false;
    261     callback_.Reset();
    262     Release();  // Balanced with OnStartOpenConnection().
    263   }
    264   if (delegate)
    265     delegate->OnClose(socket);
    266 }
    267 
    268 void WebSocketJob::OnAuthRequired(
    269     SocketStream* socket, AuthChallengeInfo* auth_info) {
    270   if (delegate_)
    271     delegate_->OnAuthRequired(socket, auth_info);
    272 }
    273 
    274 void WebSocketJob::OnSSLCertificateError(
    275     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
    276   if (delegate_)
    277     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
    278 }
    279 
    280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
    281   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
    282     delegate_->OnError(socket, error);
    283 }
    284 
    285 void WebSocketJob::OnCreatedSpdyStream(int result) {
    286   DCHECK(spdy_websocket_stream_.get());
    287   DCHECK(socket_.get());
    288   DCHECK_NE(ERR_IO_PENDING, result);
    289 
    290   if (state_ == CLOSED) {
    291     result = ERR_ABORTED;
    292   } else if (result == OK) {
    293     state_ = CONNECTING;
    294     result = ERR_PROTOCOL_SWITCHED;
    295   } else {
    296     spdy_websocket_stream_.reset();
    297   }
    298 
    299   CompleteIO(result);
    300 }
    301 
    302 void WebSocketJob::OnSentSpdyHeaders() {
    303   DCHECK_NE(INITIALIZED, state_);
    304   if (state_ != CONNECTING)
    305     return;
    306   if (delegate_)
    307     delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
    308   handshake_request_.reset();
    309 }
    310 
    311 void WebSocketJob::OnSpdyResponseHeadersUpdated(
    312     const SpdyHeaderBlock& response_headers) {
    313   DCHECK_NE(INITIALIZED, state_);
    314   if (state_ != CONNECTING)
    315     return;
    316   // TODO(toyoshim): Fallback to non-spdy connection?
    317   handshake_response_->ParseResponseHeaderBlock(response_headers,
    318                                                 challenge_,
    319                                                 spdy_protocol_version_);
    320 
    321   SaveCookiesAndNotifyHeadersComplete();
    322 }
    323 
    324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
    325   DCHECK_NE(INITIALIZED, state_);
    326   DCHECK_NE(CONNECTING, state_);
    327   if (state_ == CLOSED)
    328     return;
    329   if (!spdy_websocket_stream_.get())
    330     return;
    331   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
    332 }
    333 
    334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
    335   DCHECK_NE(INITIALIZED, state_);
    336   DCHECK_NE(CONNECTING, state_);
    337   if (state_ == CLOSED)
    338     return;
    339   if (!spdy_websocket_stream_.get())
    340     return;
    341   if (buffer) {
    342     OnReceivedData(
    343         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
    344   } else {
    345     OnReceivedData(socket_.get(), NULL, 0);
    346   }
    347 }
    348 
    349 void WebSocketJob::OnCloseSpdyStream() {
    350   spdy_websocket_stream_.reset();
    351   OnClose(socket_.get());
    352 }
    353 
    354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
    355   DCHECK_EQ(state_, CONNECTING);
    356   if (started_to_send_handshake_request_)
    357     return false;
    358   if (!handshake_request_->ParseRequest(data, len))
    359     return false;
    360 
    361   AddCookieHeaderAndSend();
    362   return true;
    363 }
    364 
    365 void WebSocketJob::AddCookieHeaderAndSend() {
    366   bool allow = true;
    367   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
    368     allow = false;
    369 
    370   if (socket_.get() && delegate_ && state_ == CONNECTING) {
    371     handshake_request_->RemoveHeaders(kCookieHeaders,
    372                                       arraysize(kCookieHeaders));
    373     if (allow && socket_->context()->cookie_store()) {
    374       // Add cookies, including HttpOnly cookies.
    375       CookieOptions cookie_options;
    376       cookie_options.set_include_httponly();
    377       socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
    378           GetURLForCookies(), cookie_options,
    379           base::Bind(&WebSocketJob::LoadCookieCallback,
    380                      weak_ptr_factory_.GetWeakPtr()));
    381     } else {
    382       DoSendData();
    383     }
    384   }
    385 }
    386 
    387 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
    388   if (!cookie.empty())
    389     // TODO(tyoshino): Sending cookie means that connection doesn't need
    390     // kPrivacyModeEnabled as cookies may be server-bound and channel id
    391     // wouldn't negatively affect privacy anyway. Need to restart connection
    392     // or refactor to determine cookie status prior to connecting.
    393     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
    394   DoSendData();
    395 }
    396 
    397 void WebSocketJob::DoSendData() {
    398   if (spdy_websocket_stream_.get()) {
    399     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    400     handshake_request_->GetRequestHeaderBlock(
    401         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
    402     spdy_websocket_stream_->SendRequest(headers.Pass());
    403   } else {
    404     const std::string& handshake_request =
    405         handshake_request_->GetRawRequest();
    406     handshake_request_sent_ = 0;
    407     socket_->net_log()->AddEvent(
    408         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
    409         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
    410     socket_->SendData(handshake_request.data(),
    411                       handshake_request.size());
    412   }
    413   // Just buffered in |handshake_request_|.
    414   started_to_send_handshake_request_ = true;
    415 }
    416 
    417 void WebSocketJob::OnSentHandshakeRequest(
    418     SocketStream* socket, int amount_sent) {
    419   DCHECK_EQ(state_, CONNECTING);
    420   handshake_request_sent_ += amount_sent;
    421   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
    422   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
    423     // handshake request has been sent.
    424     // notify original size of handshake request to delegate.
    425     if (delegate_)
    426       delegate_->OnSentData(
    427           socket,
    428           handshake_request_->original_length());
    429     handshake_request_.reset();
    430   }
    431 }
    432 
    433 void WebSocketJob::OnReceivedHandshakeResponse(
    434     SocketStream* socket, const char* data, int len) {
    435   DCHECK_EQ(state_, CONNECTING);
    436   if (handshake_response_->HasResponse()) {
    437     // If we already has handshake response, received data should be frame
    438     // data, not handshake message.
    439     received_data_after_handshake_.insert(
    440         received_data_after_handshake_.end(), data, data + len);
    441     return;
    442   }
    443 
    444   size_t response_length = handshake_response_->ParseRawResponse(data, len);
    445   if (!handshake_response_->HasResponse()) {
    446     // not yet. we need more data.
    447     return;
    448   }
    449   // handshake message is completed.
    450   std::string raw_response = handshake_response_->GetRawResponse();
    451   socket_->net_log()->AddEvent(
    452       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
    453       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
    454   if (len - response_length > 0) {
    455     // If we received extra data, it should be frame data.
    456     DCHECK(received_data_after_handshake_.empty());
    457     received_data_after_handshake_.assign(data + response_length, data + len);
    458   }
    459   SaveCookiesAndNotifyHeadersComplete();
    460 }
    461 
    462 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
    463   // handshake message is completed.
    464   DCHECK(handshake_response_->HasResponse());
    465 
    466   // Extract cookies from the handshake response into a temporary vector.
    467   response_cookies_.clear();
    468   response_cookies_save_index_ = 0;
    469 
    470   handshake_response_->GetHeaders(
    471       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
    472 
    473   // Now, loop over the response cookies, and attempt to persist each.
    474   SaveNextCookie();
    475 }
    476 
    477 void WebSocketJob::NotifyHeadersComplete() {
    478   // Remove cookie headers, with malformed headers preserved.
    479   // Actual handshake should be done in Blink.
    480   handshake_response_->RemoveHeaders(
    481       kSetCookieHeaders, arraysize(kSetCookieHeaders));
    482   std::string handshake_response = handshake_response_->GetResponse();
    483   handshake_response_.reset();
    484   std::vector<char> received_data(handshake_response.begin(),
    485                                   handshake_response.end());
    486   received_data.insert(received_data.end(),
    487                        received_data_after_handshake_.begin(),
    488                        received_data_after_handshake_.end());
    489   received_data_after_handshake_.clear();
    490 
    491   state_ = OPEN;
    492 
    493   DCHECK(!received_data.empty());
    494   if (delegate_)
    495     delegate_->OnReceivedData(
    496         socket_.get(), &received_data.front(), received_data.size());
    497 
    498   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    499 }
    500 
    501 void WebSocketJob::SaveNextCookie() {
    502   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
    503     return;
    504 
    505   callback_pending_ = false;
    506   save_next_cookie_running_ = true;
    507 
    508   if (socket_->context()->cookie_store()) {
    509     GURL url_for_cookies = GetURLForCookies();
    510 
    511     CookieOptions options;
    512     options.set_include_httponly();
    513 
    514     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
    515     // CookieMonster's asynchronous operation APIs queue the callback to run it
    516     // on the thread where the API was called, there won't be race. I.e. unless
    517     // the callback is run synchronously, it won't be run in parallel with this
    518     // method.
    519     while (!callback_pending_ &&
    520            response_cookies_save_index_ < response_cookies_.size()) {
    521       std::string cookie = response_cookies_[response_cookies_save_index_];
    522       response_cookies_save_index_++;
    523 
    524       if (!delegate_->CanSetCookie(
    525               socket_.get(), url_for_cookies, cookie, &options))
    526         continue;
    527 
    528       callback_pending_ = true;
    529       socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
    530           url_for_cookies, cookie, options,
    531           base::Bind(&WebSocketJob::OnCookieSaved,
    532                      weak_ptr_factory_.GetWeakPtr()));
    533     }
    534   }
    535 
    536   save_next_cookie_running_ = false;
    537 
    538   if (callback_pending_)
    539     return;
    540 
    541   response_cookies_.clear();
    542   response_cookies_save_index_ = 0;
    543 
    544   NotifyHeadersComplete();
    545 }
    546 
    547 void WebSocketJob::OnCookieSaved(bool cookie_status) {
    548   // Tell the caller of SetCookieWithOptionsAsync() that this completion
    549   // callback is invoked.
    550   // - If the caller checks callback_pending earlier than this callback, the
    551   //   caller exits to let this method continue iteration.
    552   // - Otherwise, the caller continues iteration.
    553   callback_pending_ = false;
    554 
    555   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
    556   // the loop. Otherwise, return.
    557   if (save_next_cookie_running_)
    558     return;
    559 
    560   SaveNextCookie();
    561 }
    562 
    563 GURL WebSocketJob::GetURLForCookies() const {
    564   GURL url = socket_->url();
    565   std::string scheme = socket_->is_secure() ? "https" : "http";
    566   url_canon::Replacements<char> replacements;
    567   replacements.SetScheme(scheme.c_str(),
    568                          url_parse::Component(0, scheme.length()));
    569   return url.ReplaceComponents(replacements);
    570 }
    571 
    572 const AddressList& WebSocketJob::address_list() const {
    573   return addresses_;
    574 }
    575 
    576 int WebSocketJob::TrySpdyStream() {
    577   if (!socket_.get())
    578     return ERR_FAILED;
    579 
    580   if (!websocket_over_spdy_enabled_)
    581     return OK;
    582 
    583   // Check if we have a SPDY session available.
    584   HttpTransactionFactory* factory =
    585       socket_->context()->http_transaction_factory();
    586   if (!factory)
    587     return OK;
    588   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
    589   if (!session.get())
    590     return OK;
    591   SpdySessionPool* spdy_pool = session->spdy_session_pool();
    592   PrivacyMode privacy_mode = socket_->privacy_mode();
    593   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
    594                            socket_->proxy_server(), privacy_mode);
    595   // Forbid wss downgrade to SPDY without SSL.
    596   // TODO(toyoshim): Does it realize the same policy with HTTP?
    597   base::WeakPtr<SpdySession> spdy_session =
    598       spdy_pool->FindAvailableSession(key, *socket_->net_log());
    599   if (!spdy_session)
    600     return OK;
    601 
    602   SSLInfo ssl_info;
    603   bool was_npn_negotiated;
    604   NextProto protocol_negotiated = kProtoUnknown;
    605   bool use_ssl = spdy_session->GetSSLInfo(
    606       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
    607   if (socket_->is_secure() && !use_ssl)
    608     return OK;
    609 
    610   // Create SpdyWebSocketStream.
    611   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
    612   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
    613 
    614   int result = spdy_websocket_stream_->InitializeStream(
    615       socket_->url(), MEDIUM, *socket_->net_log());
    616   if (result == OK) {
    617     OnConnected(socket_.get(), kMaxPendingSendAllowed);
    618     return ERR_PROTOCOL_SWITCHED;
    619   }
    620   if (result != ERR_IO_PENDING) {
    621     spdy_websocket_stream_.reset();
    622     return OK;
    623   }
    624 
    625   return ERR_IO_PENDING;
    626 }
    627 
    628 void WebSocketJob::SetWaiting() {
    629   waiting_ = true;
    630 }
    631 
    632 bool WebSocketJob::IsWaiting() const {
    633   return waiting_;
    634 }
    635 
    636 void WebSocketJob::Wakeup() {
    637   if (!waiting_)
    638     return;
    639   waiting_ = false;
    640   DCHECK(!callback_.is_null());
    641   base::MessageLoopForIO::current()->PostTask(
    642       FROM_HERE,
    643       base::Bind(&WebSocketJob::RetryPendingIO,
    644                  weak_ptr_factory_.GetWeakPtr()));
    645 }
    646 
    647 void WebSocketJob::RetryPendingIO() {
    648   int result = TrySpdyStream();
    649 
    650   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
    651   // OnCreatedSpdyStream().
    652   if (result != ERR_IO_PENDING)
    653     CompleteIO(result);
    654 }
    655 
    656 void WebSocketJob::CompleteIO(int result) {
    657   // |callback_| may be null if OnClose() or DetachDelegate() was called.
    658   if (!callback_.is_null()) {
    659     CompletionCallback callback = callback_;
    660     callback_.Reset();
    661     callback.Run(result);
    662     Release();  // Balanced with OnStartOpenConnection().
    663   }
    664 }
    665 
    666 bool WebSocketJob::SendDataInternal(const char* data, int length) {
    667   if (spdy_websocket_stream_.get())
    668     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
    669   if (socket_.get())
    670     return socket_->SendData(data, length);
    671   return false;
    672 }
    673 
    674 void WebSocketJob::CloseInternal() {
    675   if (spdy_websocket_stream_.get())
    676     spdy_websocket_stream_->Close();
    677   if (socket_.get())
    678     socket_->Close();
    679 }
    680 
    681 void WebSocketJob::SendPending() {
    682   if (current_send_buffer_.get())
    683     return;
    684 
    685   // Current buffer has been sent. Try next if any.
    686   if (send_buffer_queue_.empty()) {
    687     // No more data to send.
    688     if (state_ == CLOSING)
    689       CloseInternal();
    690     return;
    691   }
    692 
    693   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
    694   send_buffer_queue_.pop_front();
    695   current_send_buffer_ =
    696       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
    697   SendDataInternal(current_send_buffer_->data(),
    698                    current_send_buffer_->BytesRemaining());
    699 }
    700 
    701 }  // namespace net
    702