Home | History | Annotate | Download | only in websockets
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_channel.h"
      6 
      7 #include <limits.h>  // for INT_MAX
      8 
      9 #include <algorithm>
     10 #include <deque>
     11 
     12 #include "base/basictypes.h"  // for size_t
     13 #include "base/big_endian.h"
     14 #include "base/bind.h"
     15 #include "base/compiler_specific.h"
     16 #include "base/memory/ref_counted.h"
     17 #include "base/memory/weak_ptr.h"
     18 #include "base/message_loop/message_loop.h"
     19 #include "base/metrics/histogram.h"
     20 #include "base/numerics/safe_conversions.h"
     21 #include "base/stl_util.h"
     22 #include "base/strings/stringprintf.h"
     23 #include "base/time/time.h"
     24 #include "net/base/io_buffer.h"
     25 #include "net/base/net_log.h"
     26 #include "net/http/http_request_headers.h"
     27 #include "net/http/http_response_headers.h"
     28 #include "net/http/http_util.h"
     29 #include "net/websockets/websocket_errors.h"
     30 #include "net/websockets/websocket_event_interface.h"
     31 #include "net/websockets/websocket_frame.h"
     32 #include "net/websockets/websocket_handshake_request_info.h"
     33 #include "net/websockets/websocket_handshake_response_info.h"
     34 #include "net/websockets/websocket_mux.h"
     35 #include "net/websockets/websocket_stream.h"
     36 #include "url/origin.h"
     37 
     38 namespace net {
     39 
     40 namespace {
     41 
     42 using base::StreamingUtf8Validator;
     43 
     44 const int kDefaultSendQuotaLowWaterMark = 1 << 16;
     45 const int kDefaultSendQuotaHighWaterMark = 1 << 17;
     46 const size_t kWebSocketCloseCodeLength = 2;
     47 // This timeout is based on TCPMaximumSegmentLifetime * 2 from
     48 // MainThreadWebSocketChannel.cpp in Blink.
     49 const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
     50 
     51 typedef WebSocketEventInterface::ChannelState ChannelState;
     52 const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
     53 const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
     54 
     55 // Maximum close reason length = max control frame payload -
     56 //                               status code length
     57 //                             = 125 - 2
     58 const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
     59 
     60 // Check a close status code for strict compliance with RFC6455. This is only
     61 // used for close codes received from a renderer that we are intending to send
     62 // out over the network. See ParseClose() for the restrictions on incoming close
     63 // codes. The |code| parameter is type int for convenience of implementation;
     64 // the real type is uint16. Code 1005 is treated specially; it cannot be set
     65 // explicitly by Javascript but the renderer uses it to indicate we should send
     66 // a Close frame with no payload.
     67 bool IsStrictlyValidCloseStatusCode(int code) {
     68   static const int kInvalidRanges[] = {
     69       // [BAD, OK)
     70       0,    1000,   // 1000 is the first valid code
     71       1006, 1007,   // 1006 MUST NOT be set.
     72       1014, 3000,   // 1014 unassigned; 1015 up to 2999 are reserved.
     73       5000, 65536,  // Codes above 5000 are invalid.
     74   };
     75   const int* const kInvalidRangesEnd =
     76       kInvalidRanges + arraysize(kInvalidRanges);
     77 
     78   DCHECK_GE(code, 0);
     79   DCHECK_LT(code, 65536);
     80   const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
     81   DCHECK_NE(kInvalidRangesEnd, upper);
     82   DCHECK_GT(upper, kInvalidRanges);
     83   DCHECK_GT(*upper, code);
     84   DCHECK_LE(*(upper - 1), code);
     85   return ((upper - kInvalidRanges) % 2) == 0;
     86 }
     87 
     88 // This function avoids a bunch of boilerplate code.
     89 void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
     90 
     91 // Sets |name| to the name of the frame type for the given |opcode|. Note that
     92 // for all of Text, Binary and Continuation opcode, this method returns
     93 // "Data frame".
     94 void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
     95                            std::string* name) {
     96   switch (opcode) {
     97     case WebSocketFrameHeader::kOpCodeText:    // fall-thru
     98     case WebSocketFrameHeader::kOpCodeBinary:  // fall-thru
     99     case WebSocketFrameHeader::kOpCodeContinuation:
    100       *name = "Data frame";
    101       break;
    102 
    103     case WebSocketFrameHeader::kOpCodePing:
    104       *name = "Ping";
    105       break;
    106 
    107     case WebSocketFrameHeader::kOpCodePong:
    108       *name = "Pong";
    109       break;
    110 
    111     case WebSocketFrameHeader::kOpCodeClose:
    112       *name = "Close";
    113       break;
    114 
    115     default:
    116       *name = "Unknown frame type";
    117       break;
    118   }
    119 
    120   return;
    121 }
    122 
    123 }  // namespace
    124 
    125 // A class to encapsulate a set of frames and information about the size of
    126 // those frames.
    127 class WebSocketChannel::SendBuffer {
    128  public:
    129   SendBuffer() : total_bytes_(0) {}
    130 
    131   // Add a WebSocketFrame to the buffer and increase total_bytes_.
    132   void AddFrame(scoped_ptr<WebSocketFrame> chunk);
    133 
    134   // Return a pointer to the frames_ for write purposes.
    135   ScopedVector<WebSocketFrame>* frames() { return &frames_; }
    136 
    137  private:
    138   // The frames_ that will be sent in the next call to WriteFrames().
    139   ScopedVector<WebSocketFrame> frames_;
    140 
    141   // The total size of the payload data in |frames_|. This will be used to
    142   // measure the throughput of the link.
    143   // TODO(ricea): Measure the throughput of the link.
    144   size_t total_bytes_;
    145 };
    146 
    147 void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
    148   total_bytes_ += frame->header.payload_length;
    149   frames_.push_back(frame.release());
    150 }
    151 
    152 // Implementation of WebSocketStream::ConnectDelegate that simply forwards the
    153 // calls on to the WebSocketChannel that created it.
    154 class WebSocketChannel::ConnectDelegate
    155     : public WebSocketStream::ConnectDelegate {
    156  public:
    157   explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
    158 
    159   virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
    160     creator_->OnConnectSuccess(stream.Pass());
    161     // |this| may have been deleted.
    162   }
    163 
    164   virtual void OnFailure(const std::string& message) OVERRIDE {
    165     creator_->OnConnectFailure(message);
    166     // |this| has been deleted.
    167   }
    168 
    169   virtual void OnStartOpeningHandshake(
    170       scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
    171     creator_->OnStartOpeningHandshake(request.Pass());
    172   }
    173 
    174   virtual void OnFinishOpeningHandshake(
    175       scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
    176     creator_->OnFinishOpeningHandshake(response.Pass());
    177   }
    178 
    179   virtual void OnSSLCertificateError(
    180       scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks>
    181           ssl_error_callbacks,
    182       const SSLInfo& ssl_info,
    183       bool fatal) OVERRIDE {
    184     creator_->OnSSLCertificateError(
    185         ssl_error_callbacks.Pass(), ssl_info, fatal);
    186   }
    187 
    188  private:
    189   // A pointer to the WebSocketChannel that created this object. There is no
    190   // danger of this pointer being stale, because deleting the WebSocketChannel
    191   // cancels the connect process, deleting this object and preventing its
    192   // callbacks from being called.
    193   WebSocketChannel* const creator_;
    194 
    195   DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
    196 };
    197 
    198 class WebSocketChannel::HandshakeNotificationSender
    199     : public base::SupportsWeakPtr<HandshakeNotificationSender> {
    200  public:
    201   explicit HandshakeNotificationSender(WebSocketChannel* channel);
    202   ~HandshakeNotificationSender();
    203 
    204   static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
    205 
    206   ChannelState SendImmediately(WebSocketEventInterface* event_interface);
    207 
    208   const WebSocketHandshakeRequestInfo* handshake_request_info() const {
    209     return handshake_request_info_.get();
    210   }
    211 
    212   void set_handshake_request_info(
    213       scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
    214     handshake_request_info_ = request_info.Pass();
    215   }
    216 
    217   const WebSocketHandshakeResponseInfo* handshake_response_info() const {
    218     return handshake_response_info_.get();
    219   }
    220 
    221   void set_handshake_response_info(
    222       scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
    223     handshake_response_info_ = response_info.Pass();
    224   }
    225 
    226  private:
    227   WebSocketChannel* owner_;
    228   scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
    229   scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
    230 };
    231 
    232 WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
    233     WebSocketChannel* channel)
    234     : owner_(channel) {}
    235 
    236 WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
    237 
    238 void WebSocketChannel::HandshakeNotificationSender::Send(
    239     base::WeakPtr<HandshakeNotificationSender> sender) {
    240   // Do nothing if |sender| is already destructed.
    241   if (sender) {
    242     WebSocketChannel* channel = sender->owner_;
    243     AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
    244   }
    245 }
    246 
    247 ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
    248     WebSocketEventInterface* event_interface) {
    249 
    250   if (handshake_request_info_.get()) {
    251     if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
    252                                handshake_request_info_.Pass()))
    253       return CHANNEL_DELETED;
    254   }
    255 
    256   if (handshake_response_info_.get()) {
    257     if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
    258                                handshake_response_info_.Pass()))
    259       return CHANNEL_DELETED;
    260 
    261     // TODO(yhirano): We can release |this| to save memory because
    262     // there will be no more opening handshake notification.
    263   }
    264 
    265   return CHANNEL_ALIVE;
    266 }
    267 
    268 WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
    269     bool final,
    270     WebSocketFrameHeader::OpCode opcode,
    271     const scoped_refptr<IOBuffer>& data,
    272     size_t offset,
    273     size_t size)
    274     : final_(final),
    275       opcode_(opcode),
    276       data_(data),
    277       offset_(offset),
    278       size_(size) {}
    279 
    280 WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
    281 
    282 void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
    283   DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
    284   opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
    285 }
    286 
    287 void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
    288   DCHECK_LE(offset_, size_);
    289   DCHECK_LE(bytes, size_ - offset_);
    290   offset_ += bytes;
    291 }
    292 
    293 WebSocketChannel::WebSocketChannel(
    294     scoped_ptr<WebSocketEventInterface> event_interface,
    295     URLRequestContext* url_request_context)
    296     : event_interface_(event_interface.Pass()),
    297       url_request_context_(url_request_context),
    298       send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
    299       send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
    300       current_send_quota_(0),
    301       current_receive_quota_(0),
    302       timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
    303       received_close_code_(0),
    304       state_(FRESHLY_CONSTRUCTED),
    305       notification_sender_(new HandshakeNotificationSender(this)),
    306       sending_text_message_(false),
    307       receiving_text_message_(false),
    308       expecting_to_handle_continuation_(false),
    309       initial_frame_forwarded_(false) {}
    310 
    311 WebSocketChannel::~WebSocketChannel() {
    312   // The stream may hold a pointer to read_frames_, and so it needs to be
    313   // destroyed first.
    314   stream_.reset();
    315   // The timer may have a callback pointing back to us, so stop it just in case
    316   // someone decides to run the event loop from their destructor.
    317   timer_.Stop();
    318 }
    319 
    320 void WebSocketChannel::SendAddChannelRequest(
    321     const GURL& socket_url,
    322     const std::vector<std::string>& requested_subprotocols,
    323     const url::Origin& origin) {
    324   // Delegate to the tested version.
    325   SendAddChannelRequestWithSuppliedCreator(
    326       socket_url,
    327       requested_subprotocols,
    328       origin,
    329       base::Bind(&WebSocketStream::CreateAndConnectStream));
    330 }
    331 
    332 void WebSocketChannel::SetState(State new_state) {
    333   DCHECK_NE(state_, new_state);
    334 
    335   if (new_state == CONNECTED)
    336     established_on_ = base::TimeTicks::Now();
    337   if (state_ == CONNECTED && !established_on_.is_null()) {
    338     UMA_HISTOGRAM_LONG_TIMES(
    339         "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
    340   }
    341 
    342   state_ = new_state;
    343 }
    344 
    345 bool WebSocketChannel::InClosingState() const {
    346   // The state RECV_CLOSED is not supported here, because it is only used in one
    347   // code path and should not leak into the code in general.
    348   DCHECK_NE(RECV_CLOSED, state_)
    349       << "InClosingState called with state_ == RECV_CLOSED";
    350   return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
    351 }
    352 
    353 void WebSocketChannel::SendFrame(bool fin,
    354                                  WebSocketFrameHeader::OpCode op_code,
    355                                  const std::vector<char>& data) {
    356   if (data.size() > INT_MAX) {
    357     NOTREACHED() << "Frame size sanity check failed";
    358     return;
    359   }
    360   if (stream_ == NULL) {
    361     LOG(DFATAL) << "Got SendFrame without a connection established; "
    362                 << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
    363                 << " data.size()=" << data.size();
    364     return;
    365   }
    366   if (InClosingState()) {
    367     DVLOG(1) << "SendFrame called in state " << state_
    368              << ". This may be a bug, or a harmless race.";
    369     return;
    370   }
    371   if (state_ != CONNECTED) {
    372     NOTREACHED() << "SendFrame() called in state " << state_;
    373     return;
    374   }
    375   if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
    376     // TODO(ricea): Kill renderer.
    377     AllowUnused(
    378         FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
    379     // |this| has been deleted.
    380     return;
    381   }
    382   if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
    383     LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
    384                 << "; misbehaving renderer? fin=" << fin
    385                 << " data.size()=" << data.size();
    386     return;
    387   }
    388   if (op_code == WebSocketFrameHeader::kOpCodeText ||
    389       (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
    390        sending_text_message_)) {
    391     StreamingUtf8Validator::State state =
    392         outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
    393     if (state == StreamingUtf8Validator::INVALID ||
    394         (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
    395       // TODO(ricea): Kill renderer.
    396       AllowUnused(
    397           FailChannel("Browser sent a text frame containing invalid UTF-8",
    398                       kWebSocketErrorGoingAway,
    399                       ""));
    400       // |this| has been deleted.
    401       return;
    402     }
    403     sending_text_message_ = !fin;
    404     DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
    405   }
    406   current_send_quota_ -= data.size();
    407   // TODO(ricea): If current_send_quota_ has dropped below
    408   // send_quota_low_water_mark_, it might be good to increase the "low
    409   // water mark" and "high water mark", but only if the link to the WebSocket
    410   // server is not saturated.
    411   scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
    412   std::copy(data.begin(), data.end(), buffer->data());
    413   AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
    414   // |this| may have been deleted.
    415 }
    416 
    417 void WebSocketChannel::SendFlowControl(int64 quota) {
    418   DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
    419          state_ == CLOSE_WAIT);
    420   // TODO(ricea): Kill the renderer if it tries to send us a negative quota
    421   // value or > INT_MAX.
    422   DCHECK_GE(quota, 0);
    423   DCHECK_LE(quota, INT_MAX);
    424   if (!pending_received_frames_.empty()) {
    425     DCHECK_EQ(0, current_receive_quota_);
    426   }
    427   while (!pending_received_frames_.empty() && quota > 0) {
    428     PendingReceivedFrame& front = pending_received_frames_.front();
    429     const size_t data_size = front.size() - front.offset();
    430     const size_t bytes_to_send =
    431         std::min(base::checked_cast<size_t>(quota), data_size);
    432     const bool final = front.final() && data_size == bytes_to_send;
    433     const char* data =
    434         front.data().get() ? front.data()->data() + front.offset() : NULL;
    435     DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
    436     const std::vector<char> data_vector(data, data + bytes_to_send);
    437     DVLOG(3) << "Sending frame previously split due to quota to the "
    438              << "renderer: quota=" << quota << " data_size=" << data_size
    439              << " bytes_to_send=" << bytes_to_send;
    440     if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
    441         CHANNEL_DELETED)
    442       return;
    443     if (bytes_to_send < data_size) {
    444       front.DidConsume(bytes_to_send);
    445       front.ResetOpcode();
    446       return;
    447     }
    448     const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
    449     DCHECK_GE(quota, signed_bytes_to_send);
    450     quota -= signed_bytes_to_send;
    451 
    452     pending_received_frames_.pop();
    453   }
    454   // If current_receive_quota_ == 0 then there is no pending ReadFrames()
    455   // operation.
    456   const bool start_read =
    457       current_receive_quota_ == 0 && quota > 0 &&
    458       (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
    459   current_receive_quota_ += base::checked_cast<int>(quota);
    460   if (start_read)
    461     AllowUnused(ReadFrames());
    462   // |this| may have been deleted.
    463 }
    464 
    465 void WebSocketChannel::StartClosingHandshake(uint16 code,
    466                                              const std::string& reason) {
    467   if (InClosingState()) {
    468     // When the associated renderer process is killed while the channel is in
    469     // CLOSING state we reach here.
    470     DVLOG(1) << "StartClosingHandshake called in state " << state_
    471              << ". This may be a bug, or a harmless race.";
    472     return;
    473   }
    474   if (state_ == CONNECTING) {
    475     // Abort the in-progress handshake and drop the connection immediately.
    476     stream_request_.reset();
    477     SetState(CLOSED);
    478     AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
    479     return;
    480   }
    481   if (state_ != CONNECTED) {
    482     NOTREACHED() << "StartClosingHandshake() called in state " << state_;
    483     return;
    484   }
    485   // Javascript actually only permits 1000 and 3000-4999, but the implementation
    486   // itself may produce different codes. The length of |reason| is also checked
    487   // by Javascript.
    488   if (!IsStrictlyValidCloseStatusCode(code) ||
    489       reason.size() > kMaximumCloseReasonLength) {
    490     // "InternalServerError" is actually used for errors from any endpoint, per
    491     // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
    492     // reason it must be malfunctioning in some way, and based on that we
    493     // interpret this as an internal error.
    494     if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
    495       DCHECK_EQ(CONNECTED, state_);
    496       SetState(SEND_CLOSED);
    497     }
    498     return;
    499   }
    500   if (SendClose(
    501           code,
    502           StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
    503       CHANNEL_DELETED)
    504     return;
    505   DCHECK_EQ(CONNECTED, state_);
    506   SetState(SEND_CLOSED);
    507 }
    508 
    509 void WebSocketChannel::SendAddChannelRequestForTesting(
    510     const GURL& socket_url,
    511     const std::vector<std::string>& requested_subprotocols,
    512     const url::Origin& origin,
    513     const WebSocketStreamCreator& creator) {
    514   SendAddChannelRequestWithSuppliedCreator(
    515       socket_url, requested_subprotocols, origin, creator);
    516 }
    517 
    518 void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
    519     base::TimeDelta delay) {
    520   timeout_ = delay;
    521 }
    522 
    523 void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
    524     const GURL& socket_url,
    525     const std::vector<std::string>& requested_subprotocols,
    526     const url::Origin& origin,
    527     const WebSocketStreamCreator& creator) {
    528   DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
    529   if (!socket_url.SchemeIsWSOrWSS()) {
    530     // TODO(ricea): Kill the renderer (this error should have been caught by
    531     // Javascript).
    532     AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
    533     // |this| is deleted here.
    534     return;
    535   }
    536   socket_url_ = socket_url;
    537   scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
    538       new ConnectDelegate(this));
    539   stream_request_ = creator.Run(socket_url_,
    540                                 requested_subprotocols,
    541                                 origin,
    542                                 url_request_context_,
    543                                 BoundNetLog(),
    544                                 connect_delegate.Pass());
    545   SetState(CONNECTING);
    546 }
    547 
    548 void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
    549   DCHECK(stream);
    550   DCHECK_EQ(CONNECTING, state_);
    551 
    552   stream_ = stream.Pass();
    553 
    554   SetState(CONNECTED);
    555 
    556   if (event_interface_->OnAddChannelResponse(
    557           false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
    558       CHANNEL_DELETED)
    559     return;
    560 
    561   // TODO(ricea): Get flow control information from the WebSocketStream once we
    562   // have a multiplexing WebSocketStream.
    563   current_send_quota_ = send_quota_high_water_mark_;
    564   if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
    565       CHANNEL_DELETED)
    566     return;
    567 
    568   // |stream_request_| is not used once the connection has succeeded.
    569   stream_request_.reset();
    570 
    571   AllowUnused(ReadFrames());
    572   // |this| may have been deleted.
    573 }
    574 
    575 void WebSocketChannel::OnConnectFailure(const std::string& message) {
    576   DCHECK_EQ(CONNECTING, state_);
    577 
    578   // Copy the message before we delete its owner.
    579   std::string message_copy = message;
    580 
    581   SetState(CLOSED);
    582   stream_request_.reset();
    583 
    584   if (CHANNEL_DELETED ==
    585       notification_sender_->SendImmediately(event_interface_.get())) {
    586     // |this| has been deleted.
    587     return;
    588   }
    589   AllowUnused(event_interface_->OnFailChannel(message_copy));
    590   // |this| has been deleted.
    591 }
    592 
    593 void WebSocketChannel::OnSSLCertificateError(
    594     scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,
    595     const SSLInfo& ssl_info,
    596     bool fatal) {
    597   AllowUnused(event_interface_->OnSSLCertificateError(
    598       ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal));
    599 }
    600 
    601 void WebSocketChannel::OnStartOpeningHandshake(
    602     scoped_ptr<WebSocketHandshakeRequestInfo> request) {
    603   DCHECK(!notification_sender_->handshake_request_info());
    604 
    605   // Because it is hard to handle an IPC error synchronously is difficult,
    606   // we asynchronously notify the information.
    607   notification_sender_->set_handshake_request_info(request.Pass());
    608   ScheduleOpeningHandshakeNotification();
    609 }
    610 
    611 void WebSocketChannel::OnFinishOpeningHandshake(
    612     scoped_ptr<WebSocketHandshakeResponseInfo> response) {
    613   DCHECK(!notification_sender_->handshake_response_info());
    614 
    615   // Because it is hard to handle an IPC error synchronously is difficult,
    616   // we asynchronously notify the information.
    617   notification_sender_->set_handshake_response_info(response.Pass());
    618   ScheduleOpeningHandshakeNotification();
    619 }
    620 
    621 void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
    622   base::MessageLoop::current()->PostTask(
    623       FROM_HERE,
    624       base::Bind(HandshakeNotificationSender::Send,
    625                  notification_sender_->AsWeakPtr()));
    626 }
    627 
    628 ChannelState WebSocketChannel::WriteFrames() {
    629   int result = OK;
    630   do {
    631     // This use of base::Unretained is safe because this object owns the
    632     // WebSocketStream and destroying it cancels all callbacks.
    633     result = stream_->WriteFrames(
    634         data_being_sent_->frames(),
    635         base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
    636                    base::Unretained(this),
    637                    false));
    638     if (result != ERR_IO_PENDING) {
    639       if (OnWriteDone(true, result) == CHANNEL_DELETED)
    640         return CHANNEL_DELETED;
    641       // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
    642       // guaranteed to be the same as before OnWriteDone() call.
    643     }
    644   } while (result == OK && data_being_sent_);
    645   return CHANNEL_ALIVE;
    646 }
    647 
    648 ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
    649   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
    650   DCHECK_NE(CONNECTING, state_);
    651   DCHECK_NE(ERR_IO_PENDING, result);
    652   DCHECK(data_being_sent_);
    653   switch (result) {
    654     case OK:
    655       if (data_to_send_next_) {
    656         data_being_sent_ = data_to_send_next_.Pass();
    657         if (!synchronous)
    658           return WriteFrames();
    659       } else {
    660         data_being_sent_.reset();
    661         if (current_send_quota_ < send_quota_low_water_mark_) {
    662           // TODO(ricea): Increase low_water_mark and high_water_mark if
    663           // throughput is high, reduce them if throughput is low.  Low water
    664           // mark needs to be >= the bandwidth delay product *of the IPC
    665           // channel*. Because factors like context-switch time, thread wake-up
    666           // time, and bus speed come into play it is complex and probably needs
    667           // to be determined empirically.
    668           DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
    669           // TODO(ricea): Truncate quota by the quota specified by the remote
    670           // server, if the protocol in use supports quota.
    671           int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
    672           current_send_quota_ += fresh_quota;
    673           return event_interface_->OnFlowControl(fresh_quota);
    674         }
    675       }
    676       return CHANNEL_ALIVE;
    677 
    678     // If a recoverable error condition existed, it would go here.
    679 
    680     default:
    681       DCHECK_LT(result, 0)
    682           << "WriteFrames() should only return OK or ERR_ codes";
    683 
    684       stream_->Close();
    685       SetState(CLOSED);
    686       return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
    687   }
    688 }
    689 
    690 ChannelState WebSocketChannel::ReadFrames() {
    691   int result = OK;
    692   while (result == OK && current_receive_quota_ > 0) {
    693     // This use of base::Unretained is safe because this object owns the
    694     // WebSocketStream, and any pending reads will be cancelled when it is
    695     // destroyed.
    696     result = stream_->ReadFrames(
    697         &read_frames_,
    698         base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
    699                    base::Unretained(this),
    700                    false));
    701     if (result != ERR_IO_PENDING) {
    702       if (OnReadDone(true, result) == CHANNEL_DELETED)
    703         return CHANNEL_DELETED;
    704     }
    705     DCHECK_NE(CLOSED, state_);
    706   }
    707   return CHANNEL_ALIVE;
    708 }
    709 
    710 ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
    711   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
    712   DCHECK_NE(CONNECTING, state_);
    713   DCHECK_NE(ERR_IO_PENDING, result);
    714   switch (result) {
    715     case OK:
    716       // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
    717       // with no data read, not an empty response.
    718       DCHECK(!read_frames_.empty())
    719           << "ReadFrames() returned OK, but nothing was read.";
    720       for (size_t i = 0; i < read_frames_.size(); ++i) {
    721         scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
    722         read_frames_[i] = NULL;
    723         if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
    724           return CHANNEL_DELETED;
    725       }
    726       read_frames_.clear();
    727       // There should always be a call to ReadFrames pending.
    728       // TODO(ricea): Unless we are out of quota.
    729       DCHECK_NE(CLOSED, state_);
    730       if (!synchronous)
    731         return ReadFrames();
    732       return CHANNEL_ALIVE;
    733 
    734     case ERR_WS_PROTOCOL_ERROR:
    735       // This could be kWebSocketErrorProtocolError (specifically, non-minimal
    736       // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
    737       // extension-specific error.
    738       return FailChannel("Invalid frame header",
    739                          kWebSocketErrorProtocolError,
    740                          "WebSocket Protocol Error");
    741 
    742     default:
    743       DCHECK_LT(result, 0)
    744           << "ReadFrames() should only return OK or ERR_ codes";
    745 
    746       stream_->Close();
    747       SetState(CLOSED);
    748 
    749       uint16 code = kWebSocketErrorAbnormalClosure;
    750       std::string reason = "";
    751       bool was_clean = false;
    752       if (received_close_code_ != 0) {
    753         code = received_close_code_;
    754         reason = received_close_reason_;
    755         was_clean = (result == ERR_CONNECTION_CLOSED);
    756       }
    757 
    758       return DoDropChannel(was_clean, code, reason);
    759   }
    760 }
    761 
    762 ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
    763   if (frame->header.masked) {
    764     // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
    765     // masked frame."
    766     return FailChannel(
    767         "A server must not mask any frames that it sends to the "
    768         "client.",
    769         kWebSocketErrorProtocolError,
    770         "Masked frame from server");
    771   }
    772   const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
    773   DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
    774          frame->header.final);
    775   if (frame->header.reserved1 || frame->header.reserved2 ||
    776       frame->header.reserved3) {
    777     return FailChannel(base::StringPrintf(
    778                            "One or more reserved bits are on: reserved1 = %d, "
    779                            "reserved2 = %d, reserved3 = %d",
    780                            static_cast<int>(frame->header.reserved1),
    781                            static_cast<int>(frame->header.reserved2),
    782                            static_cast<int>(frame->header.reserved3)),
    783                        kWebSocketErrorProtocolError,
    784                        "Invalid reserved bit");
    785   }
    786 
    787   // Respond to the frame appropriately to its type.
    788   return HandleFrameByState(
    789       opcode, frame->header.final, frame->data, frame->header.payload_length);
    790 }
    791 
    792 ChannelState WebSocketChannel::HandleFrameByState(
    793     const WebSocketFrameHeader::OpCode opcode,
    794     bool final,
    795     const scoped_refptr<IOBuffer>& data_buffer,
    796     size_t size) {
    797   DCHECK_NE(RECV_CLOSED, state_)
    798       << "HandleFrame() does not support being called re-entrantly from within "
    799          "SendClose()";
    800   DCHECK_NE(CLOSED, state_);
    801   if (state_ == CLOSE_WAIT) {
    802     std::string frame_name;
    803     GetFrameTypeForOpcode(opcode, &frame_name);
    804 
    805     // FailChannel() won't send another Close frame.
    806     return FailChannel(
    807         frame_name + " received after close", kWebSocketErrorProtocolError, "");
    808   }
    809   switch (opcode) {
    810     case WebSocketFrameHeader::kOpCodeText:  // fall-thru
    811     case WebSocketFrameHeader::kOpCodeBinary:
    812     case WebSocketFrameHeader::kOpCodeContinuation:
    813       return HandleDataFrame(opcode, final, data_buffer, size);
    814 
    815     case WebSocketFrameHeader::kOpCodePing:
    816       DVLOG(1) << "Got Ping of size " << size;
    817       if (state_ == CONNECTED)
    818         return SendFrameFromIOBuffer(
    819             true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
    820       DVLOG(3) << "Ignored ping in state " << state_;
    821       return CHANNEL_ALIVE;
    822 
    823     case WebSocketFrameHeader::kOpCodePong:
    824       DVLOG(1) << "Got Pong of size " << size;
    825       // There is no need to do anything with pong messages.
    826       return CHANNEL_ALIVE;
    827 
    828     case WebSocketFrameHeader::kOpCodeClose: {
    829       // TODO(ricea): If there is a message which is queued for transmission to
    830       // the renderer, then the renderer should not receive an
    831       // OnClosingHandshake or OnDropChannel IPC until the queued message has
    832       // been completedly transmitted.
    833       uint16 code = kWebSocketNormalClosure;
    834       std::string reason;
    835       std::string message;
    836       if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
    837         return FailChannel(message, code, reason);
    838       }
    839       // TODO(ricea): Find a way to safely log the message from the close
    840       // message (escape control codes and so on).
    841       DVLOG(1) << "Got Close with code " << code;
    842       switch (state_) {
    843         case CONNECTED:
    844           SetState(RECV_CLOSED);
    845           if (SendClose(code, reason) == CHANNEL_DELETED)
    846             return CHANNEL_DELETED;
    847           DCHECK_EQ(RECV_CLOSED, state_);
    848           SetState(CLOSE_WAIT);
    849 
    850           if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
    851             return CHANNEL_DELETED;
    852           received_close_code_ = code;
    853           received_close_reason_ = reason;
    854           break;
    855 
    856         case SEND_CLOSED:
    857           SetState(CLOSE_WAIT);
    858           // From RFC6455 section 7.1.5: "Each endpoint
    859           // will see the status code sent by the other end as _The WebSocket
    860           // Connection Close Code_."
    861           received_close_code_ = code;
    862           received_close_reason_ = reason;
    863           break;
    864 
    865         default:
    866           LOG(DFATAL) << "Got Close in unexpected state " << state_;
    867           break;
    868       }
    869       return CHANNEL_ALIVE;
    870     }
    871 
    872     default:
    873       return FailChannel(
    874           base::StringPrintf("Unrecognized frame opcode: %d", opcode),
    875           kWebSocketErrorProtocolError,
    876           "Unknown opcode");
    877   }
    878 }
    879 
    880 ChannelState WebSocketChannel::HandleDataFrame(
    881     WebSocketFrameHeader::OpCode opcode,
    882     bool final,
    883     const scoped_refptr<IOBuffer>& data_buffer,
    884     size_t size) {
    885   if (state_ != CONNECTED) {
    886     DVLOG(3) << "Ignored data packet received in state " << state_;
    887     return CHANNEL_ALIVE;
    888   }
    889   DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
    890          opcode == WebSocketFrameHeader::kOpCodeText ||
    891          opcode == WebSocketFrameHeader::kOpCodeBinary);
    892   const bool got_continuation =
    893       (opcode == WebSocketFrameHeader::kOpCodeContinuation);
    894   if (got_continuation != expecting_to_handle_continuation_) {
    895     const std::string console_log = got_continuation
    896         ? "Received unexpected continuation frame."
    897         : "Received start of new message but previous message is unfinished.";
    898     const std::string reason = got_continuation
    899         ? "Unexpected continuation"
    900         : "Previous data frame unfinished";
    901     return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
    902   }
    903   expecting_to_handle_continuation_ = !final;
    904   WebSocketFrameHeader::OpCode opcode_to_send = opcode;
    905   if (!initial_frame_forwarded_ &&
    906       opcode == WebSocketFrameHeader::kOpCodeContinuation) {
    907     opcode_to_send = receiving_text_message_
    908                          ? WebSocketFrameHeader::kOpCodeText
    909                          : WebSocketFrameHeader::kOpCodeBinary;
    910   }
    911   if (opcode == WebSocketFrameHeader::kOpCodeText ||
    912       (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
    913        receiving_text_message_)) {
    914     // This call is not redundant when size == 0 because it tells us what
    915     // the current state is.
    916     StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
    917         size ? data_buffer->data() : NULL, size);
    918     if (state == StreamingUtf8Validator::INVALID ||
    919         (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
    920       return FailChannel("Could not decode a text frame as UTF-8.",
    921                          kWebSocketErrorProtocolError,
    922                          "Invalid UTF-8 in text frame");
    923     }
    924     receiving_text_message_ = !final;
    925     DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
    926   }
    927   if (size == 0U && !final)
    928     return CHANNEL_ALIVE;
    929 
    930   initial_frame_forwarded_ = !final;
    931   if (size > base::checked_cast<size_t>(current_receive_quota_) ||
    932       !pending_received_frames_.empty()) {
    933     const bool no_quota = (current_receive_quota_ == 0);
    934     DCHECK(no_quota || pending_received_frames_.empty());
    935     DVLOG(3) << "Queueing frame to renderer due to quota. quota="
    936              << current_receive_quota_ << " size=" << size;
    937     WebSocketFrameHeader::OpCode opcode_to_queue =
    938         no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
    939     pending_received_frames_.push(PendingReceivedFrame(
    940         final, opcode_to_queue, data_buffer, current_receive_quota_, size));
    941     if (no_quota)
    942       return CHANNEL_ALIVE;
    943     size = current_receive_quota_;
    944     final = false;
    945   }
    946 
    947   // TODO(ricea): Can this copy be eliminated?
    948   const char* const data_begin = size ? data_buffer->data() : NULL;
    949   const char* const data_end = data_begin + size;
    950   const std::vector<char> data(data_begin, data_end);
    951   current_receive_quota_ -= size;
    952   DCHECK_GE(current_receive_quota_, 0);
    953 
    954   // Sends the received frame to the renderer process.
    955   return event_interface_->OnDataFrame(final, opcode_to_send, data);
    956 }
    957 
    958 ChannelState WebSocketChannel::SendFrameFromIOBuffer(
    959     bool fin,
    960     WebSocketFrameHeader::OpCode op_code,
    961     const scoped_refptr<IOBuffer>& buffer,
    962     size_t size) {
    963   DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
    964   DCHECK(stream_);
    965 
    966   scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
    967   WebSocketFrameHeader& header = frame->header;
    968   header.final = fin;
    969   header.masked = true;
    970   header.payload_length = size;
    971   frame->data = buffer;
    972 
    973   if (data_being_sent_) {
    974     // Either the link to the WebSocket server is saturated, or several messages
    975     // are being sent in a batch.
    976     // TODO(ricea): Keep some statistics to work out the situation and adjust
    977     // quota appropriately.
    978     if (!data_to_send_next_)
    979       data_to_send_next_.reset(new SendBuffer);
    980     data_to_send_next_->AddFrame(frame.Pass());
    981     return CHANNEL_ALIVE;
    982   }
    983 
    984   data_being_sent_.reset(new SendBuffer);
    985   data_being_sent_->AddFrame(frame.Pass());
    986   return WriteFrames();
    987 }
    988 
    989 ChannelState WebSocketChannel::FailChannel(const std::string& message,
    990                                            uint16 code,
    991                                            const std::string& reason) {
    992   DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
    993   DCHECK_NE(CONNECTING, state_);
    994   DCHECK_NE(CLOSED, state_);
    995 
    996   // TODO(ricea): Logging.
    997   if (state_ == CONNECTED) {
    998     if (SendClose(code, reason) == CHANNEL_DELETED)
    999       return CHANNEL_DELETED;
   1000   }
   1001 
   1002   // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
   1003   // should close the connection itself without waiting for the closing
   1004   // handshake.
   1005   stream_->Close();
   1006   SetState(CLOSED);
   1007   return event_interface_->OnFailChannel(message);
   1008 }
   1009 
   1010 ChannelState WebSocketChannel::SendClose(uint16 code,
   1011                                          const std::string& reason) {
   1012   DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
   1013   DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
   1014   scoped_refptr<IOBuffer> body;
   1015   size_t size = 0;
   1016   if (code == kWebSocketErrorNoStatusReceived) {
   1017     // Special case: translate kWebSocketErrorNoStatusReceived into a Close
   1018     // frame with no payload.
   1019     DCHECK(reason.empty());
   1020     body = new IOBuffer(0);
   1021   } else {
   1022     const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
   1023     body = new IOBuffer(payload_length);
   1024     size = payload_length;
   1025     base::WriteBigEndian(body->data(), code);
   1026     COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
   1027                    they_should_both_be_two);
   1028     std::copy(
   1029         reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
   1030   }
   1031   // This use of base::Unretained() is safe because we stop the timer in the
   1032   // destructor.
   1033   timer_.Start(
   1034       FROM_HERE,
   1035       timeout_,
   1036       base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
   1037   if (SendFrameFromIOBuffer(
   1038           true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
   1039       CHANNEL_DELETED)
   1040     return CHANNEL_DELETED;
   1041   return CHANNEL_ALIVE;
   1042 }
   1043 
   1044 bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
   1045                                   size_t size,
   1046                                   uint16* code,
   1047                                   std::string* reason,
   1048                                   std::string* message) {
   1049   reason->clear();
   1050   if (size < kWebSocketCloseCodeLength) {
   1051     if (size == 0U) {
   1052       *code = kWebSocketErrorNoStatusReceived;
   1053       return true;
   1054     }
   1055 
   1056     DVLOG(1) << "Close frame with payload size " << size << " received "
   1057              << "(the first byte is " << std::hex
   1058              << static_cast<int>(buffer->data()[0]) << ")";
   1059     *code = kWebSocketErrorProtocolError;
   1060     *message =
   1061         "Received a broken close frame containing an invalid size body.";
   1062     return false;
   1063   }
   1064 
   1065   const char* data = buffer->data();
   1066   uint16 unchecked_code = 0;
   1067   base::ReadBigEndian(data, &unchecked_code);
   1068   COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
   1069                  they_should_both_be_two_bytes);
   1070 
   1071   switch (unchecked_code) {
   1072     case kWebSocketErrorNoStatusReceived:
   1073     case kWebSocketErrorAbnormalClosure:
   1074     case kWebSocketErrorTlsHandshake:
   1075       *code = kWebSocketErrorProtocolError;
   1076       *message =
   1077           "Received a broken close frame containing a reserved status code.";
   1078       return false;
   1079 
   1080     default:
   1081       *code = unchecked_code;
   1082       break;
   1083   }
   1084 
   1085   std::string text(data + kWebSocketCloseCodeLength, data + size);
   1086   if (StreamingUtf8Validator::Validate(text)) {
   1087     reason->swap(text);
   1088     return true;
   1089   }
   1090 
   1091   *code = kWebSocketErrorProtocolError;
   1092   *reason = "Invalid UTF-8 in Close frame";
   1093   *message = "Received a broken close frame containing invalid UTF-8.";
   1094   return false;
   1095 }
   1096 
   1097 ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
   1098                                              uint16 code,
   1099                                              const std::string& reason) {
   1100   if (CHANNEL_DELETED ==
   1101       notification_sender_->SendImmediately(event_interface_.get()))
   1102     return CHANNEL_DELETED;
   1103   ChannelState result =
   1104       event_interface_->OnDropChannel(was_clean, code, reason);
   1105   DCHECK_EQ(CHANNEL_DELETED, result);
   1106   return result;
   1107 }
   1108 
   1109 void WebSocketChannel::CloseTimeout() {
   1110   stream_->Close();
   1111   SetState(CLOSED);
   1112   AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
   1113   // |this| has been deleted.
   1114 }
   1115 
   1116 }  // namespace net
   1117