Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_job.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/bind.h"
     10 #include "base/lazy_instance.h"
     11 #include "net/base/io_buffer.h"
     12 #include "net/base/net_errors.h"
     13 #include "net/base/net_log.h"
     14 #include "net/cookies/cookie_store.h"
     15 #include "net/http/http_network_session.h"
     16 #include "net/http/http_transaction_factory.h"
     17 #include "net/http/http_util.h"
     18 #include "net/spdy/spdy_session.h"
     19 #include "net/spdy/spdy_session_pool.h"
     20 #include "net/url_request/url_request_context.h"
     21 #include "net/websockets/websocket_handshake_handler.h"
     22 #include "net/websockets/websocket_net_log_params.h"
     23 #include "net/websockets/websocket_throttle.h"
     24 #include "url/gurl.h"
     25 
     26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
     27 
     28 namespace {
     29 
     30 // lower-case header names.
     31 const char* const kCookieHeaders[] = {
     32   "cookie", "cookie2"
     33 };
     34 const char* const kSetCookieHeaders[] = {
     35   "set-cookie", "set-cookie2"
     36 };
     37 
     38 net::SocketStreamJob* WebSocketJobFactory(
     39     const GURL& url, net::SocketStream::Delegate* delegate) {
     40   net::WebSocketJob* job = new net::WebSocketJob(delegate);
     41   job->InitSocketStream(new net::SocketStream(url, job));
     42   return job;
     43 }
     44 
     45 class WebSocketJobInitSingleton {
     46  private:
     47   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
     48   WebSocketJobInitSingleton() {
     49     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
     50     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
     51   }
     52 };
     53 
     54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
     55     LAZY_INSTANCE_INITIALIZER;
     56 
     57 }  // anonymous namespace
     58 
     59 namespace net {
     60 
     61 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
     62 
     63 // static
     64 void WebSocketJob::EnsureInit() {
     65   g_websocket_job_init.Get();
     66 }
     67 
     68 // static
     69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
     70   websocket_over_spdy_enabled_ = enabled;
     71 }
     72 
     73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
     74     : delegate_(delegate),
     75       state_(INITIALIZED),
     76       waiting_(false),
     77       handshake_request_(new WebSocketHandshakeRequestHandler),
     78       handshake_response_(new WebSocketHandshakeResponseHandler),
     79       started_to_send_handshake_request_(false),
     80       handshake_request_sent_(0),
     81       response_cookies_save_index_(0),
     82       spdy_protocol_version_(0),
     83       save_next_cookie_running_(false),
     84       callback_pending_(false),
     85       weak_ptr_factory_(this),
     86       weak_ptr_factory_for_send_pending_(this) {
     87 }
     88 
     89 WebSocketJob::~WebSocketJob() {
     90   DCHECK_EQ(CLOSED, state_);
     91   DCHECK(!delegate_);
     92   DCHECK(!socket_.get());
     93 }
     94 
     95 void WebSocketJob::Connect() {
     96   DCHECK(socket_.get());
     97   DCHECK_EQ(state_, INITIALIZED);
     98   state_ = CONNECTING;
     99   socket_->Connect();
    100 }
    101 
    102 bool WebSocketJob::SendData(const char* data, int len) {
    103   switch (state_) {
    104     case INITIALIZED:
    105       return false;
    106 
    107     case CONNECTING:
    108       return SendHandshakeRequest(data, len);
    109 
    110     case OPEN:
    111       {
    112         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
    113         memcpy(buffer->data(), data, len);
    114         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
    115           send_buffer_queue_.push_back(buffer);
    116           return true;
    117         }
    118         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
    119         return SendDataInternal(current_send_buffer_->data(),
    120                                 current_send_buffer_->BytesRemaining());
    121       }
    122 
    123     case CLOSING:
    124     case CLOSED:
    125       return false;
    126   }
    127   return false;
    128 }
    129 
    130 void WebSocketJob::Close() {
    131   if (state_ == CLOSED)
    132     return;
    133 
    134   state_ = CLOSING;
    135   if (current_send_buffer_.get()) {
    136     // Will close in SendPending.
    137     return;
    138   }
    139   state_ = CLOSED;
    140   CloseInternal();
    141 }
    142 
    143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
    144   state_ = CONNECTING;
    145   socket_->RestartWithAuth(credentials);
    146 }
    147 
    148 void WebSocketJob::DetachDelegate() {
    149   state_ = CLOSED;
    150   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    151 
    152   scoped_refptr<WebSocketJob> protect(this);
    153   weak_ptr_factory_.InvalidateWeakPtrs();
    154   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
    155 
    156   delegate_ = NULL;
    157   if (socket_.get())
    158     socket_->DetachDelegate();
    159   socket_ = NULL;
    160   if (!callback_.is_null()) {
    161     waiting_ = false;
    162     callback_.Reset();
    163     Release();  // Balanced with OnStartOpenConnection().
    164   }
    165 }
    166 
    167 int WebSocketJob::OnStartOpenConnection(
    168     SocketStream* socket, const CompletionCallback& callback) {
    169   DCHECK(callback_.is_null());
    170   state_ = CONNECTING;
    171 
    172   addresses_ = socket->address_list();
    173   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
    174     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
    175   }
    176 
    177   if (delegate_) {
    178     int result = delegate_->OnStartOpenConnection(socket, callback);
    179     DCHECK_EQ(OK, result);
    180   }
    181   if (waiting_) {
    182     // PutInQueue() may set |waiting_| true for throttling. In this case,
    183     // Wakeup() will be called later.
    184     callback_ = callback;
    185     AddRef();  // Balanced when callback_ is cleared.
    186     return ERR_IO_PENDING;
    187   }
    188   return TrySpdyStream();
    189 }
    190 
    191 void WebSocketJob::OnConnected(
    192     SocketStream* socket, int max_pending_send_allowed) {
    193   if (state_ == CLOSED)
    194     return;
    195   DCHECK_EQ(CONNECTING, state_);
    196   if (delegate_)
    197     delegate_->OnConnected(socket, max_pending_send_allowed);
    198 }
    199 
    200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
    201   DCHECK_NE(INITIALIZED, state_);
    202   DCHECK_GT(amount_sent, 0);
    203   if (state_ == CLOSED)
    204     return;
    205   if (state_ == CONNECTING) {
    206     OnSentHandshakeRequest(socket, amount_sent);
    207     return;
    208   }
    209   if (delegate_) {
    210     DCHECK(state_ == OPEN || state_ == CLOSING);
    211     if (!current_send_buffer_.get()) {
    212       VLOG(1)
    213           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
    214       return;
    215     }
    216     current_send_buffer_->DidConsume(amount_sent);
    217     if (current_send_buffer_->BytesRemaining() > 0)
    218       return;
    219 
    220     // We need to report amount_sent of original buffer size, instead of
    221     // amount sent to |socket|.
    222     amount_sent = current_send_buffer_->size();
    223     DCHECK_GT(amount_sent, 0);
    224     current_send_buffer_ = NULL;
    225     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
    226       base::MessageLoopForIO::current()->PostTask(
    227           FROM_HERE,
    228           base::Bind(&WebSocketJob::SendPending,
    229                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
    230     }
    231     delegate_->OnSentData(socket, amount_sent);
    232   }
    233 }
    234 
    235 void WebSocketJob::OnReceivedData(
    236     SocketStream* socket, const char* data, int len) {
    237   DCHECK_NE(INITIALIZED, state_);
    238   if (state_ == CLOSED)
    239     return;
    240   if (state_ == CONNECTING) {
    241     OnReceivedHandshakeResponse(socket, data, len);
    242     return;
    243   }
    244   DCHECK(state_ == OPEN || state_ == CLOSING);
    245   if (delegate_ && len > 0)
    246     delegate_->OnReceivedData(socket, data, len);
    247 }
    248 
    249 void WebSocketJob::OnClose(SocketStream* socket) {
    250   state_ = CLOSED;
    251   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    252 
    253   scoped_refptr<WebSocketJob> protect(this);
    254   weak_ptr_factory_.InvalidateWeakPtrs();
    255 
    256   SocketStream::Delegate* delegate = delegate_;
    257   delegate_ = NULL;
    258   socket_ = NULL;
    259   if (!callback_.is_null()) {
    260     waiting_ = false;
    261     callback_.Reset();
    262     Release();  // Balanced with OnStartOpenConnection().
    263   }
    264   if (delegate)
    265     delegate->OnClose(socket);
    266 }
    267 
    268 void WebSocketJob::OnAuthRequired(
    269     SocketStream* socket, AuthChallengeInfo* auth_info) {
    270   if (delegate_)
    271     delegate_->OnAuthRequired(socket, auth_info);
    272 }
    273 
    274 void WebSocketJob::OnSSLCertificateError(
    275     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
    276   if (delegate_)
    277     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
    278 }
    279 
    280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
    281   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
    282     delegate_->OnError(socket, error);
    283 }
    284 
    285 void WebSocketJob::OnCreatedSpdyStream(int result) {
    286   DCHECK(spdy_websocket_stream_.get());
    287   DCHECK(socket_.get());
    288   DCHECK_NE(ERR_IO_PENDING, result);
    289 
    290   if (state_ == CLOSED) {
    291     result = ERR_ABORTED;
    292   } else if (result == OK) {
    293     state_ = CONNECTING;
    294     result = ERR_PROTOCOL_SWITCHED;
    295   } else {
    296     spdy_websocket_stream_.reset();
    297   }
    298 
    299   CompleteIO(result);
    300 }
    301 
    302 void WebSocketJob::OnSentSpdyHeaders() {
    303   DCHECK_NE(INITIALIZED, state_);
    304   if (state_ != CONNECTING)
    305     return;
    306   if (delegate_)
    307     delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
    308   handshake_request_.reset();
    309 }
    310 
    311 void WebSocketJob::OnSpdyResponseHeadersUpdated(
    312     const SpdyHeaderBlock& response_headers) {
    313   DCHECK_NE(INITIALIZED, state_);
    314   if (state_ != CONNECTING)
    315     return;
    316   // TODO(toyoshim): Fallback to non-spdy connection?
    317   handshake_response_->ParseResponseHeaderBlock(response_headers,
    318                                                 challenge_,
    319                                                 spdy_protocol_version_);
    320 
    321   SaveCookiesAndNotifyHeadersComplete();
    322 }
    323 
    324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
    325   DCHECK_NE(INITIALIZED, state_);
    326   DCHECK_NE(CONNECTING, state_);
    327   if (state_ == CLOSED)
    328     return;
    329   if (!spdy_websocket_stream_.get())
    330     return;
    331   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
    332 }
    333 
    334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
    335   DCHECK_NE(INITIALIZED, state_);
    336   DCHECK_NE(CONNECTING, state_);
    337   if (state_ == CLOSED)
    338     return;
    339   if (!spdy_websocket_stream_.get())
    340     return;
    341   if (buffer) {
    342     OnReceivedData(
    343         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
    344   } else {
    345     OnReceivedData(socket_.get(), NULL, 0);
    346   }
    347 }
    348 
    349 void WebSocketJob::OnCloseSpdyStream() {
    350   spdy_websocket_stream_.reset();
    351   OnClose(socket_.get());
    352 }
    353 
    354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
    355   DCHECK_EQ(state_, CONNECTING);
    356   if (started_to_send_handshake_request_)
    357     return false;
    358   if (!handshake_request_->ParseRequest(data, len))
    359     return false;
    360 
    361   // handshake message is completed.
    362   handshake_response_->set_protocol_version(
    363       handshake_request_->protocol_version());
    364   AddCookieHeaderAndSend();
    365   return true;
    366 }
    367 
    368 void WebSocketJob::AddCookieHeaderAndSend() {
    369   bool allow = true;
    370   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
    371     allow = false;
    372 
    373   if (socket_.get() && delegate_ && state_ == CONNECTING) {
    374     handshake_request_->RemoveHeaders(kCookieHeaders,
    375                                       arraysize(kCookieHeaders));
    376     if (allow && socket_->context()->cookie_store()) {
    377       // Add cookies, including HttpOnly cookies.
    378       CookieOptions cookie_options;
    379       cookie_options.set_include_httponly();
    380       socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
    381           GetURLForCookies(), cookie_options,
    382           base::Bind(&WebSocketJob::LoadCookieCallback,
    383                      weak_ptr_factory_.GetWeakPtr()));
    384     } else {
    385       DoSendData();
    386     }
    387   }
    388 }
    389 
    390 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
    391   if (!cookie.empty())
    392     // TODO(tyoshino): Sending cookie means that connection doesn't need
    393     // kPrivacyModeEnabled as cookies may be server-bound and channel id
    394     // wouldn't negatively affect privacy anyway. Need to restart connection
    395     // or refactor to determine cookie status prior to connecting.
    396     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
    397   DoSendData();
    398 }
    399 
    400 void WebSocketJob::DoSendData() {
    401   if (spdy_websocket_stream_.get()) {
    402     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    403     handshake_request_->GetRequestHeaderBlock(
    404         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
    405     spdy_websocket_stream_->SendRequest(headers.Pass());
    406   } else {
    407     const std::string& handshake_request =
    408         handshake_request_->GetRawRequest();
    409     handshake_request_sent_ = 0;
    410     socket_->net_log()->AddEvent(
    411         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
    412         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
    413     socket_->SendData(handshake_request.data(),
    414                       handshake_request.size());
    415   }
    416   // Just buffered in |handshake_request_|.
    417   started_to_send_handshake_request_ = true;
    418 }
    419 
    420 void WebSocketJob::OnSentHandshakeRequest(
    421     SocketStream* socket, int amount_sent) {
    422   DCHECK_EQ(state_, CONNECTING);
    423   handshake_request_sent_ += amount_sent;
    424   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
    425   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
    426     // handshake request has been sent.
    427     // notify original size of handshake request to delegate.
    428     if (delegate_)
    429       delegate_->OnSentData(
    430           socket,
    431           handshake_request_->original_length());
    432     handshake_request_.reset();
    433   }
    434 }
    435 
    436 void WebSocketJob::OnReceivedHandshakeResponse(
    437     SocketStream* socket, const char* data, int len) {
    438   DCHECK_EQ(state_, CONNECTING);
    439   if (handshake_response_->HasResponse()) {
    440     // If we already has handshake response, received data should be frame
    441     // data, not handshake message.
    442     received_data_after_handshake_.insert(
    443         received_data_after_handshake_.end(), data, data + len);
    444     return;
    445   }
    446 
    447   size_t response_length = handshake_response_->ParseRawResponse(data, len);
    448   if (!handshake_response_->HasResponse()) {
    449     // not yet. we need more data.
    450     return;
    451   }
    452   // handshake message is completed.
    453   std::string raw_response = handshake_response_->GetRawResponse();
    454   socket_->net_log()->AddEvent(
    455       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
    456       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
    457   if (len - response_length > 0) {
    458     // If we received extra data, it should be frame data.
    459     DCHECK(received_data_after_handshake_.empty());
    460     received_data_after_handshake_.assign(data + response_length, data + len);
    461   }
    462   SaveCookiesAndNotifyHeadersComplete();
    463 }
    464 
    465 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
    466   // handshake message is completed.
    467   DCHECK(handshake_response_->HasResponse());
    468 
    469   // Extract cookies from the handshake response into a temporary vector.
    470   response_cookies_.clear();
    471   response_cookies_save_index_ = 0;
    472 
    473   handshake_response_->GetHeaders(
    474       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
    475 
    476   // Now, loop over the response cookies, and attempt to persist each.
    477   SaveNextCookie();
    478 }
    479 
    480 void WebSocketJob::NotifyHeadersComplete() {
    481   // Remove cookie headers, with malformed headers preserved.
    482   // Actual handshake should be done in Blink.
    483   handshake_response_->RemoveHeaders(
    484       kSetCookieHeaders, arraysize(kSetCookieHeaders));
    485   std::string handshake_response = handshake_response_->GetResponse();
    486   handshake_response_.reset();
    487   std::vector<char> received_data(handshake_response.begin(),
    488                                   handshake_response.end());
    489   received_data.insert(received_data.end(),
    490                        received_data_after_handshake_.begin(),
    491                        received_data_after_handshake_.end());
    492   received_data_after_handshake_.clear();
    493 
    494   state_ = OPEN;
    495 
    496   DCHECK(!received_data.empty());
    497   if (delegate_)
    498     delegate_->OnReceivedData(
    499         socket_.get(), &received_data.front(), received_data.size());
    500 
    501   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    502 }
    503 
    504 void WebSocketJob::SaveNextCookie() {
    505   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
    506     return;
    507 
    508   callback_pending_ = false;
    509   save_next_cookie_running_ = true;
    510 
    511   if (socket_->context()->cookie_store()) {
    512     GURL url_for_cookies = GetURLForCookies();
    513 
    514     CookieOptions options;
    515     options.set_include_httponly();
    516 
    517     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
    518     // CookieMonster's asynchronous operation APIs queue the callback to run it
    519     // on the thread where the API was called, there won't be race. I.e. unless
    520     // the callback is run synchronously, it won't be run in parallel with this
    521     // method.
    522     while (!callback_pending_ &&
    523            response_cookies_save_index_ < response_cookies_.size()) {
    524       std::string cookie = response_cookies_[response_cookies_save_index_];
    525       response_cookies_save_index_++;
    526 
    527       if (!delegate_->CanSetCookie(
    528               socket_.get(), url_for_cookies, cookie, &options))
    529         continue;
    530 
    531       callback_pending_ = true;
    532       socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
    533           url_for_cookies, cookie, options,
    534           base::Bind(&WebSocketJob::OnCookieSaved,
    535                      weak_ptr_factory_.GetWeakPtr()));
    536     }
    537   }
    538 
    539   save_next_cookie_running_ = false;
    540 
    541   if (callback_pending_)
    542     return;
    543 
    544   response_cookies_.clear();
    545   response_cookies_save_index_ = 0;
    546 
    547   NotifyHeadersComplete();
    548 }
    549 
    550 void WebSocketJob::OnCookieSaved(bool cookie_status) {
    551   // Tell the caller of SetCookieWithOptionsAsync() that this completion
    552   // callback is invoked.
    553   // - If the caller checks callback_pending earlier than this callback, the
    554   //   caller exits to let this method continue iteration.
    555   // - Otherwise, the caller continues iteration.
    556   callback_pending_ = false;
    557 
    558   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
    559   // the loop. Otherwise, return.
    560   if (save_next_cookie_running_)
    561     return;
    562 
    563   SaveNextCookie();
    564 }
    565 
    566 GURL WebSocketJob::GetURLForCookies() const {
    567   GURL url = socket_->url();
    568   std::string scheme = socket_->is_secure() ? "https" : "http";
    569   url_canon::Replacements<char> replacements;
    570   replacements.SetScheme(scheme.c_str(),
    571                          url_parse::Component(0, scheme.length()));
    572   return url.ReplaceComponents(replacements);
    573 }
    574 
    575 const AddressList& WebSocketJob::address_list() const {
    576   return addresses_;
    577 }
    578 
    579 int WebSocketJob::TrySpdyStream() {
    580   if (!socket_.get())
    581     return ERR_FAILED;
    582 
    583   if (!websocket_over_spdy_enabled_)
    584     return OK;
    585 
    586   // Check if we have a SPDY session available.
    587   HttpTransactionFactory* factory =
    588       socket_->context()->http_transaction_factory();
    589   if (!factory)
    590     return OK;
    591   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
    592   if (!session.get())
    593     return OK;
    594   SpdySessionPool* spdy_pool = session->spdy_session_pool();
    595   PrivacyMode privacy_mode = socket_->privacy_mode();
    596   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
    597                            socket_->proxy_server(), privacy_mode);
    598   // Forbid wss downgrade to SPDY without SSL.
    599   // TODO(toyoshim): Does it realize the same policy with HTTP?
    600   base::WeakPtr<SpdySession> spdy_session =
    601       spdy_pool->FindAvailableSession(key, *socket_->net_log());
    602   if (!spdy_session)
    603     return OK;
    604 
    605   SSLInfo ssl_info;
    606   bool was_npn_negotiated;
    607   NextProto protocol_negotiated = kProtoUnknown;
    608   bool use_ssl = spdy_session->GetSSLInfo(
    609       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
    610   if (socket_->is_secure() && !use_ssl)
    611     return OK;
    612 
    613   // Create SpdyWebSocketStream.
    614   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
    615   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
    616 
    617   int result = spdy_websocket_stream_->InitializeStream(
    618       socket_->url(), MEDIUM, *socket_->net_log());
    619   if (result == OK) {
    620     OnConnected(socket_.get(), kMaxPendingSendAllowed);
    621     return ERR_PROTOCOL_SWITCHED;
    622   }
    623   if (result != ERR_IO_PENDING) {
    624     spdy_websocket_stream_.reset();
    625     return OK;
    626   }
    627 
    628   return ERR_IO_PENDING;
    629 }
    630 
    631 void WebSocketJob::SetWaiting() {
    632   waiting_ = true;
    633 }
    634 
    635 bool WebSocketJob::IsWaiting() const {
    636   return waiting_;
    637 }
    638 
    639 void WebSocketJob::Wakeup() {
    640   if (!waiting_)
    641     return;
    642   waiting_ = false;
    643   DCHECK(!callback_.is_null());
    644   base::MessageLoopForIO::current()->PostTask(
    645       FROM_HERE,
    646       base::Bind(&WebSocketJob::RetryPendingIO,
    647                  weak_ptr_factory_.GetWeakPtr()));
    648 }
    649 
    650 void WebSocketJob::RetryPendingIO() {
    651   int result = TrySpdyStream();
    652 
    653   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
    654   // OnCreatedSpdyStream().
    655   if (result != ERR_IO_PENDING)
    656     CompleteIO(result);
    657 }
    658 
    659 void WebSocketJob::CompleteIO(int result) {
    660   // |callback_| may be null if OnClose() or DetachDelegate() was called.
    661   if (!callback_.is_null()) {
    662     CompletionCallback callback = callback_;
    663     callback_.Reset();
    664     callback.Run(result);
    665     Release();  // Balanced with OnStartOpenConnection().
    666   }
    667 }
    668 
    669 bool WebSocketJob::SendDataInternal(const char* data, int length) {
    670   if (spdy_websocket_stream_.get())
    671     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
    672   if (socket_.get())
    673     return socket_->SendData(data, length);
    674   return false;
    675 }
    676 
    677 void WebSocketJob::CloseInternal() {
    678   if (spdy_websocket_stream_.get())
    679     spdy_websocket_stream_->Close();
    680   if (socket_.get())
    681     socket_->Close();
    682 }
    683 
    684 void WebSocketJob::SendPending() {
    685   if (current_send_buffer_.get())
    686     return;
    687 
    688   // Current buffer has been sent. Try next if any.
    689   if (send_buffer_queue_.empty()) {
    690     // No more data to send.
    691     if (state_ == CLOSING)
    692       CloseInternal();
    693     return;
    694   }
    695 
    696   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
    697   send_buffer_queue_.pop_front();
    698   current_send_buffer_ =
    699       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
    700   SendDataInternal(current_send_buffer_->data(),
    701                    current_send_buffer_->BytesRemaining());
    702 }
    703 
    704 }  // namespace net
    705