Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_job.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/lazy_instance.h"
     10 #include "base/string_tokenizer.h"
     11 #include "googleurl/src/gurl.h"
     12 #include "net/base/net_errors.h"
     13 #include "net/base/net_log.h"
     14 #include "net/base/cookie_policy.h"
     15 #include "net/base/cookie_store.h"
     16 #include "net/base/io_buffer.h"
     17 #include "net/http/http_util.h"
     18 #include "net/url_request/url_request_context.h"
     19 #include "net/websockets/websocket_frame_handler.h"
     20 #include "net/websockets/websocket_handshake_handler.h"
     21 #include "net/websockets/websocket_net_log_params.h"
     22 #include "net/websockets/websocket_throttle.h"
     23 
     24 namespace {
     25 
     26 // lower-case header names.
     27 const char* const kCookieHeaders[] = {
     28   "cookie", "cookie2"
     29 };
     30 const char* const kSetCookieHeaders[] = {
     31   "set-cookie", "set-cookie2"
     32 };
     33 
     34 net::SocketStreamJob* WebSocketJobFactory(
     35     const GURL& url, net::SocketStream::Delegate* delegate) {
     36   net::WebSocketJob* job = new net::WebSocketJob(delegate);
     37   job->InitSocketStream(new net::SocketStream(url, job));
     38   return job;
     39 }
     40 
     41 class WebSocketJobInitSingleton {
     42  private:
     43   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
     44   WebSocketJobInitSingleton() {
     45     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
     46     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
     47   }
     48 };
     49 
     50 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init(
     51     base::LINKER_INITIALIZED);
     52 
     53 }  // anonymous namespace
     54 
     55 namespace net {
     56 
     57 // static
     58 void WebSocketJob::EnsureInit() {
     59   g_websocket_job_init.Get();
     60 }
     61 
     62 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
     63     : delegate_(delegate),
     64       state_(INITIALIZED),
     65       waiting_(false),
     66       callback_(NULL),
     67       handshake_request_(new WebSocketHandshakeRequestHandler),
     68       handshake_response_(new WebSocketHandshakeResponseHandler),
     69       handshake_request_sent_(0),
     70       response_cookies_save_index_(0),
     71       send_frame_handler_(new WebSocketFrameHandler),
     72       receive_frame_handler_(new WebSocketFrameHandler) {
     73 }
     74 
     75 WebSocketJob::~WebSocketJob() {
     76   DCHECK_EQ(CLOSED, state_);
     77   DCHECK(!delegate_);
     78   DCHECK(!socket_.get());
     79 }
     80 
     81 void WebSocketJob::Connect() {
     82   DCHECK(socket_.get());
     83   DCHECK_EQ(state_, INITIALIZED);
     84   state_ = CONNECTING;
     85   socket_->Connect();
     86 }
     87 
     88 bool WebSocketJob::SendData(const char* data, int len) {
     89   switch (state_) {
     90     case INITIALIZED:
     91       return false;
     92 
     93     case CONNECTING:
     94       return SendHandshakeRequest(data, len);
     95 
     96     case OPEN:
     97       {
     98         send_frame_handler_->AppendData(data, len);
     99         // If current buffer is sending now, this data will be sent in
    100         // SendPending() after current data was sent.
    101         // Do not buffer sending data for now.  Since
    102         // WebCore::SocketStreamHandle controls traffic to keep number of
    103         // pending bytes less than max_pending_send_allowed, so when sending
    104         // larger message than max_pending_send_allowed should not be buffered.
    105         // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
    106         // sending more data when pending data reaches max_pending_send_allowed.
    107         // TODO(ukai): Fix this to support compression for larger message.
    108         int err = 0;
    109         if (!send_frame_handler_->GetCurrentBuffer() &&
    110             (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
    111           DCHECK(!current_buffer_);
    112           current_buffer_ = new DrainableIOBuffer(
    113               send_frame_handler_->GetCurrentBuffer(),
    114               send_frame_handler_->GetCurrentBufferSize());
    115           return socket_->SendData(
    116               current_buffer_->data(), current_buffer_->BytesRemaining());
    117         }
    118         return err >= 0;
    119       }
    120 
    121     case CLOSING:
    122     case CLOSED:
    123       return false;
    124   }
    125   return false;
    126 }
    127 
    128 void WebSocketJob::Close() {
    129   state_ = CLOSING;
    130   if (current_buffer_) {
    131     // Will close in SendPending.
    132     return;
    133   }
    134   state_ = CLOSED;
    135   socket_->Close();
    136 }
    137 
    138 void WebSocketJob::RestartWithAuth(
    139     const string16& username,
    140     const string16& password) {
    141   state_ = CONNECTING;
    142   socket_->RestartWithAuth(username, password);
    143 }
    144 
    145 void WebSocketJob::DetachDelegate() {
    146   state_ = CLOSED;
    147   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    148   WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
    149 
    150   scoped_refptr<WebSocketJob> protect(this);
    151 
    152   delegate_ = NULL;
    153   if (socket_)
    154     socket_->DetachDelegate();
    155   socket_ = NULL;
    156   if (callback_) {
    157     waiting_ = false;
    158     callback_ = NULL;
    159     Release();  // Balanced with OnStartOpenConnection().
    160   }
    161 }
    162 
    163 int WebSocketJob::OnStartOpenConnection(
    164     SocketStream* socket, CompletionCallback* callback) {
    165   DCHECK(!callback_);
    166   state_ = CONNECTING;
    167   addresses_.Copy(socket->address_list().head(), true);
    168   WebSocketThrottle::GetInstance()->PutInQueue(this);
    169   if (!waiting_)
    170     return OK;
    171   callback_ = callback;
    172   AddRef();  // Balanced when callback_ becomes NULL.
    173   return ERR_IO_PENDING;
    174 }
    175 
    176 void WebSocketJob::OnConnected(
    177     SocketStream* socket, int max_pending_send_allowed) {
    178   if (state_ == CLOSED)
    179     return;
    180   DCHECK_EQ(CONNECTING, state_);
    181   if (delegate_)
    182     delegate_->OnConnected(socket, max_pending_send_allowed);
    183 }
    184 
    185 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
    186   DCHECK_NE(INITIALIZED, state_);
    187   if (state_ == CLOSED)
    188     return;
    189   if (state_ == CONNECTING) {
    190     OnSentHandshakeRequest(socket, amount_sent);
    191     return;
    192   }
    193   if (delegate_) {
    194     DCHECK(state_ == OPEN || state_ == CLOSING);
    195     DCHECK_GT(amount_sent, 0);
    196     DCHECK(current_buffer_);
    197     current_buffer_->DidConsume(amount_sent);
    198     if (current_buffer_->BytesRemaining() > 0)
    199       return;
    200 
    201     // We need to report amount_sent of original buffer size, instead of
    202     // amount sent to |socket|.
    203     amount_sent = send_frame_handler_->GetOriginalBufferSize();
    204     DCHECK_GT(amount_sent, 0);
    205     current_buffer_ = NULL;
    206     send_frame_handler_->ReleaseCurrentBuffer();
    207     delegate_->OnSentData(socket, amount_sent);
    208     MessageLoopForIO::current()->PostTask(
    209         FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
    210   }
    211 }
    212 
    213 void WebSocketJob::OnReceivedData(
    214     SocketStream* socket, const char* data, int len) {
    215   DCHECK_NE(INITIALIZED, state_);
    216   if (state_ == CLOSED)
    217     return;
    218   if (state_ == CONNECTING) {
    219     OnReceivedHandshakeResponse(socket, data, len);
    220     return;
    221   }
    222   DCHECK(state_ == OPEN || state_ == CLOSING);
    223   std::string received_data;
    224   receive_frame_handler_->AppendData(data, len);
    225   // Don't buffer receiving data for now.
    226   // TODO(ukai): fix performance of WebSocketFrameHandler.
    227   while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
    228     received_data +=
    229         std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
    230                     receive_frame_handler_->GetCurrentBufferSize());
    231     receive_frame_handler_->ReleaseCurrentBuffer();
    232   }
    233   if (delegate_ && !received_data.empty())
    234       delegate_->OnReceivedData(
    235           socket, received_data.data(), received_data.size());
    236 }
    237 
    238 void WebSocketJob::OnClose(SocketStream* socket) {
    239   state_ = CLOSED;
    240   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    241   WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
    242 
    243   scoped_refptr<WebSocketJob> protect(this);
    244 
    245   SocketStream::Delegate* delegate = delegate_;
    246   delegate_ = NULL;
    247   socket_ = NULL;
    248   if (callback_) {
    249     waiting_ = false;
    250     callback_ = NULL;
    251     Release();  // Balanced with OnStartOpenConnection().
    252   }
    253   if (delegate)
    254     delegate->OnClose(socket);
    255 }
    256 
    257 void WebSocketJob::OnAuthRequired(
    258     SocketStream* socket, AuthChallengeInfo* auth_info) {
    259   if (delegate_)
    260     delegate_->OnAuthRequired(socket, auth_info);
    261 }
    262 
    263 void WebSocketJob::OnError(const SocketStream* socket, int error) {
    264   if (delegate_)
    265     delegate_->OnError(socket, error);
    266 }
    267 
    268 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
    269   DCHECK_EQ(state_, CONNECTING);
    270   if (!handshake_request_->ParseRequest(data, len))
    271     return false;
    272 
    273   // handshake message is completed.
    274   AddCookieHeaderAndSend();
    275   // Just buffered in |handshake_request_|.
    276   return true;
    277 }
    278 
    279 void WebSocketJob::AddCookieHeaderAndSend() {
    280   int policy = OK;
    281   if (socket_->context()->cookie_policy()) {
    282     GURL url_for_cookies = GetURLForCookies();
    283     policy = socket_->context()->cookie_policy()->CanGetCookies(
    284         url_for_cookies,
    285         url_for_cookies);
    286   }
    287   DCHECK_NE(ERR_IO_PENDING, policy);
    288   OnCanGetCookiesCompleted(policy);
    289 }
    290 
    291 void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
    292   if (socket_ && delegate_ && state_ == CONNECTING) {
    293     handshake_request_->RemoveHeaders(
    294         kCookieHeaders, arraysize(kCookieHeaders));
    295     if (policy == OK) {
    296       // Add cookies, including HttpOnly cookies.
    297       if (socket_->context()->cookie_store()) {
    298         CookieOptions cookie_options;
    299         cookie_options.set_include_httponly();
    300         std::string cookie =
    301             socket_->context()->cookie_store()->GetCookiesWithOptions(
    302                 GetURLForCookies(), cookie_options);
    303         if (!cookie.empty())
    304           handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
    305       }
    306     }
    307 
    308     const std::string& handshake_request = handshake_request_->GetRawRequest();
    309     handshake_request_sent_ = 0;
    310     socket_->net_log()->AddEvent(
    311         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
    312         make_scoped_refptr(
    313             new NetLogWebSocketHandshakeParameter(handshake_request)));
    314     socket_->SendData(handshake_request.data(),
    315                       handshake_request.size());
    316   }
    317 }
    318 
    319 void WebSocketJob::OnSentHandshakeRequest(
    320     SocketStream* socket, int amount_sent) {
    321   DCHECK_EQ(state_, CONNECTING);
    322   handshake_request_sent_ += amount_sent;
    323   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
    324   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
    325     // handshake request has been sent.
    326     // notify original size of handshake request to delegate.
    327     if (delegate_)
    328       delegate_->OnSentData(
    329           socket,
    330           handshake_request_->original_length());
    331     handshake_request_.reset();
    332   }
    333 }
    334 
    335 void WebSocketJob::OnReceivedHandshakeResponse(
    336     SocketStream* socket, const char* data, int len) {
    337   DCHECK_EQ(state_, CONNECTING);
    338   if (handshake_response_->HasResponse()) {
    339     // If we already has handshake response, received data should be frame
    340     // data, not handshake message.
    341     receive_frame_handler_->AppendData(data, len);
    342     return;
    343   }
    344 
    345   size_t response_length = handshake_response_->ParseRawResponse(data, len);
    346   if (!handshake_response_->HasResponse()) {
    347     // not yet. we need more data.
    348     return;
    349   }
    350   // handshake message is completed.
    351   socket_->net_log()->AddEvent(
    352       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
    353       make_scoped_refptr(new NetLogWebSocketHandshakeParameter(
    354           handshake_response_->GetRawResponse())));
    355   if (len - response_length > 0) {
    356     // If we received extra data, it should be frame data.
    357     receive_frame_handler_->AppendData(data + response_length,
    358                                        len - response_length);
    359   }
    360   SaveCookiesAndNotifyHeaderComplete();
    361 }
    362 
    363 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
    364   // handshake message is completed.
    365   DCHECK(handshake_response_->HasResponse());
    366 
    367   response_cookies_.clear();
    368   response_cookies_save_index_ = 0;
    369 
    370   handshake_response_->GetHeaders(
    371       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
    372 
    373   // Now, loop over the response cookies, and attempt to persist each.
    374   SaveNextCookie();
    375 }
    376 
    377 void WebSocketJob::SaveNextCookie() {
    378   if (response_cookies_save_index_ == response_cookies_.size()) {
    379     response_cookies_.clear();
    380     response_cookies_save_index_ = 0;
    381 
    382     // Remove cookie headers, with malformed headers preserved.
    383     // Actual handshake should be done in WebKit.
    384     handshake_response_->RemoveHeaders(
    385         kSetCookieHeaders, arraysize(kSetCookieHeaders));
    386     std::string received_data = handshake_response_->GetResponse();
    387     // Don't buffer receiving data for now.
    388     // TODO(ukai): fix performance of WebSocketFrameHandler.
    389     while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
    390       received_data +=
    391           std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
    392                       receive_frame_handler_->GetCurrentBufferSize());
    393       receive_frame_handler_->ReleaseCurrentBuffer();
    394     }
    395 
    396     state_ = OPEN;
    397     if (delegate_)
    398       delegate_->OnReceivedData(
    399           socket_, received_data.data(), received_data.size());
    400 
    401     handshake_response_.reset();
    402 
    403     WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
    404     WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
    405     return;
    406   }
    407 
    408   int policy = OK;
    409   if (socket_->context()->cookie_policy()) {
    410     GURL url_for_cookies = GetURLForCookies();
    411     policy = socket_->context()->cookie_policy()->CanSetCookie(
    412         url_for_cookies,
    413         url_for_cookies,
    414         response_cookies_[response_cookies_save_index_]);
    415   }
    416 
    417   DCHECK_NE(ERR_IO_PENDING, policy);
    418   OnCanSetCookieCompleted(policy);
    419 }
    420 
    421 void WebSocketJob::OnCanSetCookieCompleted(int policy) {
    422   if (socket_ && delegate_ && state_ == CONNECTING) {
    423     if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
    424         socket_->context()->cookie_store()) {
    425       CookieOptions options;
    426       options.set_include_httponly();
    427       if (policy == OK_FOR_SESSION_ONLY)
    428         options.set_force_session();
    429       GURL url_for_cookies = GetURLForCookies();
    430       socket_->context()->cookie_store()->SetCookieWithOptions(
    431           url_for_cookies, response_cookies_[response_cookies_save_index_],
    432           options);
    433     }
    434     response_cookies_save_index_++;
    435     SaveNextCookie();
    436   }
    437 }
    438 
    439 GURL WebSocketJob::GetURLForCookies() const {
    440   GURL url = socket_->url();
    441   std::string scheme = socket_->is_secure() ? "https" : "http";
    442   url_canon::Replacements<char> replacements;
    443   replacements.SetScheme(scheme.c_str(),
    444                          url_parse::Component(0, scheme.length()));
    445   return url.ReplaceComponents(replacements);
    446 }
    447 
    448 const AddressList& WebSocketJob::address_list() const {
    449   return addresses_;
    450 }
    451 
    452 void WebSocketJob::SetWaiting() {
    453   waiting_ = true;
    454 }
    455 
    456 bool WebSocketJob::IsWaiting() const {
    457   return waiting_;
    458 }
    459 
    460 void WebSocketJob::Wakeup() {
    461   if (!waiting_)
    462     return;
    463   waiting_ = false;
    464   DCHECK(callback_);
    465   MessageLoopForIO::current()->PostTask(
    466       FROM_HERE,
    467       NewRunnableMethod(this,
    468                         &WebSocketJob::DoCallback));
    469 }
    470 
    471 void WebSocketJob::DoCallback() {
    472   // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
    473   if (callback_) {
    474     net::CompletionCallback* callback = callback_;
    475     callback_ = NULL;
    476     callback->Run(net::OK);
    477     Release();  // Balanced with OnStartOpenConnection().
    478   }
    479 }
    480 
    481 void WebSocketJob::SendPending() {
    482   if (current_buffer_)
    483     return;
    484   // Current buffer is done.  Try next buffer if any.
    485   // Don't buffer sending data. See comment on case OPEN in SendData().
    486   if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
    487     // No more data to send.
    488     if (state_ == CLOSING)
    489       socket_->Close();
    490     return;
    491   }
    492   current_buffer_ = new DrainableIOBuffer(
    493       send_frame_handler_->GetCurrentBuffer(),
    494       send_frame_handler_->GetCurrentBufferSize());
    495   socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
    496 }
    497 
    498 }  // namespace net
    499