Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <algorithm>
      6 #include <limits>
      7 
      8 #include "net/websockets/websocket.h"
      9 
     10 #include "base/message_loop.h"
     11 #include "net/base/host_resolver.h"
     12 #include "net/websockets/websocket_handshake.h"
     13 #include "net/websockets/websocket_handshake_draft75.h"
     14 
     15 namespace net {
     16 
     17 static const char kClosingFrame[2] = {'\xff', '\x00'};
     18 static int64 kClosingHandshakeTimeout = 1000;  // msec.
     19 
     20 WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate)
     21     : ready_state_(INITIALIZED),
     22       request_(request),
     23       handshake_(NULL),
     24       delegate_(delegate),
     25       origin_loop_(MessageLoop::current()),
     26       socket_stream_(NULL),
     27       max_pending_send_allowed_(0),
     28       current_read_buf_(NULL),
     29       read_consumed_len_(0),
     30       current_write_buf_(NULL),
     31       server_closing_handshake_(false),
     32       client_closing_handshake_(false),
     33       closing_handshake_started_(false),
     34       force_close_task_(NULL),
     35       closing_handshake_timeout_(kClosingHandshakeTimeout) {
     36   DCHECK(request_.get());
     37   DCHECK(delegate_);
     38   DCHECK(origin_loop_);
     39 }
     40 
     41 WebSocket::~WebSocket() {
     42   DCHECK(ready_state_ == INITIALIZED || !delegate_);
     43   DCHECK(!socket_stream_);
     44   DCHECK(!delegate_);
     45 }
     46 
     47 void WebSocket::Connect() {
     48   DCHECK(ready_state_ == INITIALIZED);
     49   DCHECK(request_.get());
     50   DCHECK(delegate_);
     51   DCHECK(!socket_stream_);
     52   DCHECK(MessageLoop::current() == origin_loop_);
     53 
     54   socket_stream_ = new SocketStream(request_->url(), this);
     55   socket_stream_->set_context(request_->context());
     56 
     57   if (request_->host_resolver())
     58     socket_stream_->SetHostResolver(request_->host_resolver());
     59   if (request_->client_socket_factory())
     60     socket_stream_->SetClientSocketFactory(request_->client_socket_factory());
     61 
     62   AddRef();  // Release in DoClose().
     63   ready_state_ = CONNECTING;
     64   socket_stream_->Connect();
     65 }
     66 
     67 void WebSocket::Send(const std::string& msg) {
     68   if (ready_state_ == CLOSING || ready_state_ == CLOSED) {
     69     return;
     70   }
     71   if (client_closing_handshake_) {
     72     // We must not send any data after we start the WebSocket closing handshake.
     73     return;
     74   }
     75   DCHECK(ready_state_ == OPEN);
     76   DCHECK(MessageLoop::current() == origin_loop_);
     77 
     78   IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2);
     79   char* p = buf->data();
     80   *p = '\0';
     81   memcpy(p + 1, msg.data(), msg.size());
     82   *(p + 1 + msg.size()) = '\xff';
     83   pending_write_bufs_.push_back(make_scoped_refptr(buf));
     84   SendPending();
     85 }
     86 
     87 void WebSocket::Close() {
     88   DCHECK(MessageLoop::current() == origin_loop_);
     89 
     90   // If connection has not yet started, do nothing.
     91   if (ready_state_ == INITIALIZED) {
     92     DCHECK(!socket_stream_);
     93     ready_state_ = CLOSED;
     94     return;
     95   }
     96 
     97   // If the readyState attribute is in the CLOSING or CLOSED state, do nothing
     98   if (ready_state_ == CLOSING || ready_state_ == CLOSED)
     99     return;
    100 
    101   if (request_->version() == DRAFT75) {
    102     DCHECK(socket_stream_);
    103     socket_stream_->Close();
    104     return;
    105   }
    106 
    107   // If the WebSocket connection is not yet established, fail the WebSocket
    108   // connection and set the readyState attribute's value to CLOSING.
    109   if (ready_state_ == CONNECTING) {
    110     ready_state_ = CLOSING;
    111     origin_loop_->PostTask(
    112         FROM_HERE,
    113         NewRunnableMethod(this, &WebSocket::FailConnection));
    114   }
    115 
    116   // If the WebSocket closing handshake has not yet been started, start
    117   // the WebSocket closing handshake and set the readyState attribute's value
    118   // to CLOSING.
    119   if (!closing_handshake_started_) {
    120     ready_state_ = CLOSING;
    121     origin_loop_->PostTask(
    122         FROM_HERE,
    123         NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
    124   }
    125 
    126   // Otherwise, set the readyState attribute's value to CLOSING.
    127   ready_state_ = CLOSING;
    128 }
    129 
    130 void WebSocket::DetachDelegate() {
    131   if (!delegate_)
    132     return;
    133   delegate_ = NULL;
    134   if (ready_state_ == INITIALIZED) {
    135     DCHECK(!socket_stream_);
    136     ready_state_ = CLOSED;
    137     return;
    138   }
    139   if (ready_state_ != CLOSED) {
    140     DCHECK(socket_stream_);
    141     socket_stream_->Close();
    142   }
    143 }
    144 
    145 void WebSocket::OnConnected(SocketStream* socket_stream,
    146                             int max_pending_send_allowed) {
    147   DCHECK(socket_stream == socket_stream_);
    148   max_pending_send_allowed_ = max_pending_send_allowed;
    149 
    150   // Use |max_pending_send_allowed| as hint for initial size of read buffer.
    151   current_read_buf_ = new GrowableIOBuffer();
    152   current_read_buf_->SetCapacity(max_pending_send_allowed_);
    153   read_consumed_len_ = 0;
    154 
    155   DCHECK(!current_write_buf_);
    156   DCHECK(!handshake_.get());
    157   switch (request_->version()) {
    158     case DEFAULT_VERSION:
    159       handshake_.reset(new WebSocketHandshake(
    160           request_->url(), request_->origin(), request_->location(),
    161           request_->protocol()));
    162       break;
    163     case DRAFT75:
    164       handshake_.reset(new WebSocketHandshakeDraft75(
    165           request_->url(), request_->origin(), request_->location(),
    166           request_->protocol()));
    167       break;
    168     default:
    169       NOTREACHED() << "Unexpected protocol version:" << request_->version();
    170   }
    171 
    172   const std::string msg = handshake_->CreateClientHandshakeMessage();
    173   IOBufferWithSize* buf = new IOBufferWithSize(msg.size());
    174   memcpy(buf->data(), msg.data(), msg.size());
    175   pending_write_bufs_.push_back(make_scoped_refptr(buf));
    176   origin_loop_->PostTask(FROM_HERE,
    177                          NewRunnableMethod(this, &WebSocket::SendPending));
    178 }
    179 
    180 void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) {
    181   DCHECK(socket_stream == socket_stream_);
    182   DCHECK(current_write_buf_);
    183   current_write_buf_->DidConsume(amount_sent);
    184   DCHECK_GE(current_write_buf_->BytesRemaining(), 0);
    185   if (current_write_buf_->BytesRemaining() == 0) {
    186     current_write_buf_ = NULL;
    187     pending_write_bufs_.pop_front();
    188   }
    189   origin_loop_->PostTask(FROM_HERE,
    190                          NewRunnableMethod(this, &WebSocket::SendPending));
    191 }
    192 
    193 void WebSocket::OnReceivedData(SocketStream* socket_stream,
    194                                const char* data, int len) {
    195   DCHECK(socket_stream == socket_stream_);
    196   AddToReadBuffer(data, len);
    197   origin_loop_->PostTask(FROM_HERE,
    198                          NewRunnableMethod(this, &WebSocket::DoReceivedData));
    199 }
    200 
    201 void WebSocket::OnClose(SocketStream* socket_stream) {
    202   origin_loop_->PostTask(FROM_HERE,
    203                          NewRunnableMethod(this, &WebSocket::DoClose));
    204 }
    205 
    206 void WebSocket::OnError(const SocketStream* socket_stream, int error) {
    207   origin_loop_->PostTask(
    208       FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error));
    209 }
    210 
    211 void WebSocket::SendPending() {
    212   DCHECK(MessageLoop::current() == origin_loop_);
    213   if (!socket_stream_) {
    214     DCHECK_EQ(CLOSED, ready_state_);
    215     return;
    216   }
    217   if (!current_write_buf_) {
    218     if (pending_write_bufs_.empty()) {
    219       if (client_closing_handshake_) {
    220         // Already sent 0xFF and 0x00 bytes.
    221         // *The WebSocket closing handshake has started.*
    222         closing_handshake_started_ = true;
    223         if (server_closing_handshake_) {
    224           // 4.2 3-8-3 If the WebSocket connection is not already closed,
    225           // then close the WebSocket connection.
    226           // *The WebSocket closing handshake has finished*
    227           socket_stream_->Close();
    228         } else {
    229           // 5. Wait a user-agent-determined length of time, or until the
    230           // WebSocket connection is closed.
    231           force_close_task_ =
    232               NewRunnableMethod(this, &WebSocket::DoForceCloseConnection);
    233           origin_loop_->PostDelayedTask(
    234               FROM_HERE, force_close_task_, closing_handshake_timeout_);
    235         }
    236       }
    237       return;
    238     }
    239     current_write_buf_ = new DrainableIOBuffer(
    240         pending_write_bufs_.front(), pending_write_bufs_.front()->size());
    241   }
    242   DCHECK_GT(current_write_buf_->BytesRemaining(), 0);
    243   bool sent = socket_stream_->SendData(
    244       current_write_buf_->data(),
    245       std::min(current_write_buf_->BytesRemaining(),
    246                max_pending_send_allowed_));
    247   DCHECK(sent);
    248 }
    249 
    250 void WebSocket::DoReceivedData() {
    251   DCHECK(MessageLoop::current() == origin_loop_);
    252   scoped_refptr<WebSocket> protect(this);
    253   switch (ready_state_) {
    254     case CONNECTING:
    255       {
    256         DCHECK(handshake_.get());
    257         DCHECK(current_read_buf_);
    258         const char* data =
    259             current_read_buf_->StartOfBuffer() + read_consumed_len_;
    260         size_t len = current_read_buf_->offset() - read_consumed_len_;
    261         int eoh = handshake_->ReadServerHandshake(data, len);
    262         if (eoh < 0) {
    263           // Not enough data,  Retry when more data is available.
    264           return;
    265         }
    266         SkipReadBuffer(eoh);
    267       }
    268       if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) {
    269         // Handshake failed.
    270         socket_stream_->Close();
    271         return;
    272       }
    273       ready_state_ = OPEN;
    274       if (delegate_)
    275         delegate_->OnOpen(this);
    276       if (current_read_buf_->offset() == read_consumed_len_) {
    277         // No remaining data after handshake message.
    278         break;
    279       }
    280       // FALL THROUGH
    281     case OPEN:
    282     case CLOSING:  // need to process closing-frame from server.
    283       ProcessFrameData();
    284       break;
    285 
    286     case CLOSED:
    287       // Closed just after DoReceivedData is queued on |origin_loop_|.
    288       break;
    289     default:
    290       NOTREACHED();
    291       break;
    292   }
    293 }
    294 
    295 void WebSocket::ProcessFrameData() {
    296   DCHECK(current_read_buf_);
    297   if (server_closing_handshake_) {
    298     // Any data on the connection after the 0xFF frame is discarded.
    299     return;
    300   }
    301   scoped_refptr<WebSocket> protect(this);
    302   const char* start_frame =
    303       current_read_buf_->StartOfBuffer() + read_consumed_len_;
    304   const char* next_frame = start_frame;
    305   const char* p = next_frame;
    306   const char* end =
    307       current_read_buf_->StartOfBuffer() + current_read_buf_->offset();
    308   while (p < end) {
    309     // Let /error/ be false.
    310     bool error = false;
    311 
    312     // Handle the /frame type/ byte as follows.
    313     unsigned char frame_byte = static_cast<unsigned char>(*p++);
    314     if ((frame_byte & 0x80) == 0x80) {
    315       int length = 0;
    316       while (p < end) {
    317         if (length > std::numeric_limits<int>::max() / 128) {
    318           // frame length overflow.
    319           socket_stream_->Close();
    320           return;
    321         }
    322         unsigned char c = static_cast<unsigned char>(*p);
    323         length = length * 128 + (c & 0x7f);
    324         ++p;
    325         if ((c & 0x80) != 0x80)
    326           break;
    327       }
    328       // Checks if the frame body hasn't been completely received yet.
    329       // It also checks the case the frame length bytes haven't been completely
    330       // received yet, because p == end and length > 0 in such case.
    331       if (p + length < end) {
    332         p += length;
    333         next_frame = p;
    334         if (request_->version() != DRAFT75 &&
    335             frame_byte == 0xFF && length == 0) {
    336           // 4.2 Data framing 3. Handle the /frame type/ byte.
    337           // 8. If the /frame type/ is 0xFF and the /length/ was 0, then
    338           // run the following substeps:
    339           // 1. If the WebSocket closing handshake has not yet started, then
    340           // start the WebSocket closing handshake.
    341           server_closing_handshake_ = true;
    342           if (!closing_handshake_started_) {
    343             origin_loop_->PostTask(
    344                 FROM_HERE,
    345                 NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
    346           } else {
    347             // If the WebSocket closing handshake has been started and
    348             // the WebSocket connection is not already closed, then close
    349             // the WebSocket connection.
    350             socket_stream_->Close();
    351           }
    352           return;
    353         }
    354         // 4.2 3-8 Otherwise, let /error/ be true.
    355         error = true;
    356       } else {
    357         // Not enough data in buffer.
    358         break;
    359       }
    360     } else {
    361       const char* msg_start = p;
    362       while (p < end && *p != '\xff')
    363         ++p;
    364       if (p < end && *p == '\xff') {
    365         if (frame_byte == 0x00) {
    366           if (delegate_) {
    367             delegate_->OnMessage(this, std::string(msg_start, p - msg_start));
    368           }
    369         } else {
    370           // Otherwise, discard the data and let /error/ to be true.
    371           error = true;
    372         }
    373         ++p;
    374         next_frame = p;
    375       }
    376     }
    377     // If /error/ is true, then *a WebSocket error has been detected.*
    378     if (error && delegate_)
    379       delegate_->OnError(this);
    380   }
    381   SkipReadBuffer(next_frame - start_frame);
    382 }
    383 
    384 void WebSocket::AddToReadBuffer(const char* data, int len) {
    385   DCHECK(current_read_buf_);
    386   // Check if |current_read_buf_| has enough space to store |len| of |data|.
    387   if (len >= current_read_buf_->RemainingCapacity()) {
    388     current_read_buf_->SetCapacity(
    389         current_read_buf_->offset() + len);
    390   }
    391 
    392   DCHECK(current_read_buf_->RemainingCapacity() >= len);
    393   memcpy(current_read_buf_->data(), data, len);
    394   current_read_buf_->set_offset(current_read_buf_->offset() + len);
    395 }
    396 
    397 void WebSocket::SkipReadBuffer(int len) {
    398   if (len == 0)
    399     return;
    400   DCHECK_GT(len, 0);
    401   read_consumed_len_ += len;
    402   int remaining = current_read_buf_->offset() - read_consumed_len_;
    403   DCHECK_GE(remaining, 0);
    404   if (remaining < read_consumed_len_ &&
    405       current_read_buf_->RemainingCapacity() < read_consumed_len_) {
    406     // Pre compaction:
    407     // 0             v-read_consumed_len_  v-offset               v- capacity
    408     // |..processed..| .. remaining ..     | .. RemainingCapacity |
    409     //
    410     memmove(current_read_buf_->StartOfBuffer(),
    411             current_read_buf_->StartOfBuffer() + read_consumed_len_,
    412             remaining);
    413     read_consumed_len_ = 0;
    414     current_read_buf_->set_offset(remaining);
    415     // Post compaction:
    416     // 0read_consumed_len_  v- offset                             v- capacity
    417     // |.. remaining ..     | ..  RemainingCapacity  ...          |
    418     //
    419   }
    420 }
    421 
    422 void WebSocket::StartClosingHandshake() {
    423   // 4.2 *start the WebSocket closing handshake*.
    424   if (closing_handshake_started_ || client_closing_handshake_) {
    425     // 1. If the WebSocket closing handshake has started, then abort these
    426     // steps.
    427     return;
    428   }
    429   // 2.,3. Send a 0xFF and 0x00 byte to the server.
    430   client_closing_handshake_ = true;
    431   IOBufferWithSize* buf = new IOBufferWithSize(2);
    432   memcpy(buf->data(), kClosingFrame, 2);
    433   pending_write_bufs_.push_back(make_scoped_refptr(buf));
    434   SendPending();
    435 }
    436 
    437 void WebSocket::DoForceCloseConnection() {
    438   // 4.2 *start the WebSocket closing handshake*
    439   // 6. If the WebSocket connection is not already closed, then close the
    440   // WebSocket connection.  (If this happens, then the closing handshake
    441   // doesn't finish.)
    442   DCHECK(MessageLoop::current() == origin_loop_);
    443   force_close_task_ = NULL;
    444   FailConnection();
    445 }
    446 
    447 void WebSocket::FailConnection() {
    448   DCHECK(MessageLoop::current() == origin_loop_);
    449   // 6.1 Client-initiated closure.
    450   // *fail the WebSocket connection*.
    451   // the user agent must close the WebSocket connection, and may report the
    452   // problem to the user.
    453   if (!socket_stream_)
    454     return;
    455   socket_stream_->Close();
    456 }
    457 
    458 void WebSocket::DoClose() {
    459   DCHECK(MessageLoop::current() == origin_loop_);
    460   if (force_close_task_) {
    461     // WebSocket connection is closed while waiting a user-agent-determined
    462     // length of time after *The WebSocket closing handshake has started*.
    463     force_close_task_->Cancel();
    464     force_close_task_ = NULL;
    465   }
    466   WebSocketDelegate* delegate = delegate_;
    467   delegate_ = NULL;
    468   ready_state_ = CLOSED;
    469   if (!socket_stream_)
    470     return;
    471   socket_stream_ = NULL;
    472   if (delegate)
    473     delegate->OnClose(this,
    474                       server_closing_handshake_ && closing_handshake_started_);
    475   Release();
    476 }
    477 
    478 void WebSocket::DoSocketError(int error) {
    479   DCHECK(MessageLoop::current() == origin_loop_);
    480   if (delegate_)
    481     delegate_->OnSocketError(this, error);
    482 }
    483 
    484 }  // namespace net
    485