1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/websockets/websocket_channel.h" 6 7 #include <limits.h> // for INT_MAX 8 9 #include <algorithm> 10 #include <deque> 11 12 #include "base/basictypes.h" // for size_t 13 #include "base/big_endian.h" 14 #include "base/bind.h" 15 #include "base/compiler_specific.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/memory/weak_ptr.h" 18 #include "base/message_loop/message_loop.h" 19 #include "base/metrics/histogram.h" 20 #include "base/numerics/safe_conversions.h" 21 #include "base/stl_util.h" 22 #include "base/strings/stringprintf.h" 23 #include "base/time/time.h" 24 #include "net/base/io_buffer.h" 25 #include "net/base/net_log.h" 26 #include "net/http/http_request_headers.h" 27 #include "net/http/http_response_headers.h" 28 #include "net/http/http_util.h" 29 #include "net/websockets/websocket_errors.h" 30 #include "net/websockets/websocket_event_interface.h" 31 #include "net/websockets/websocket_frame.h" 32 #include "net/websockets/websocket_handshake_request_info.h" 33 #include "net/websockets/websocket_handshake_response_info.h" 34 #include "net/websockets/websocket_mux.h" 35 #include "net/websockets/websocket_stream.h" 36 #include "url/origin.h" 37 38 namespace net { 39 40 namespace { 41 42 using base::StreamingUtf8Validator; 43 44 const int kDefaultSendQuotaLowWaterMark = 1 << 16; 45 const int kDefaultSendQuotaHighWaterMark = 1 << 17; 46 const size_t kWebSocketCloseCodeLength = 2; 47 // This timeout is based on TCPMaximumSegmentLifetime * 2 from 48 // MainThreadWebSocketChannel.cpp in Blink. 49 const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60; 50 51 typedef WebSocketEventInterface::ChannelState ChannelState; 52 const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE; 53 const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED; 54 55 // Maximum close reason length = max control frame payload - 56 // status code length 57 // = 125 - 2 58 const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength; 59 60 // Check a close status code for strict compliance with RFC6455. This is only 61 // used for close codes received from a renderer that we are intending to send 62 // out over the network. See ParseClose() for the restrictions on incoming close 63 // codes. The |code| parameter is type int for convenience of implementation; 64 // the real type is uint16. Code 1005 is treated specially; it cannot be set 65 // explicitly by Javascript but the renderer uses it to indicate we should send 66 // a Close frame with no payload. 67 bool IsStrictlyValidCloseStatusCode(int code) { 68 static const int kInvalidRanges[] = { 69 // [BAD, OK) 70 0, 1000, // 1000 is the first valid code 71 1006, 1007, // 1006 MUST NOT be set. 72 1014, 3000, // 1014 unassigned; 1015 up to 2999 are reserved. 73 5000, 65536, // Codes above 5000 are invalid. 74 }; 75 const int* const kInvalidRangesEnd = 76 kInvalidRanges + arraysize(kInvalidRanges); 77 78 DCHECK_GE(code, 0); 79 DCHECK_LT(code, 65536); 80 const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code); 81 DCHECK_NE(kInvalidRangesEnd, upper); 82 DCHECK_GT(upper, kInvalidRanges); 83 DCHECK_GT(*upper, code); 84 DCHECK_LE(*(upper - 1), code); 85 return ((upper - kInvalidRanges) % 2) == 0; 86 } 87 88 // This function avoids a bunch of boilerplate code. 89 void AllowUnused(ChannelState ALLOW_UNUSED unused) {} 90 91 // Sets |name| to the name of the frame type for the given |opcode|. Note that 92 // for all of Text, Binary and Continuation opcode, this method returns 93 // "Data frame". 94 void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode, 95 std::string* name) { 96 switch (opcode) { 97 case WebSocketFrameHeader::kOpCodeText: // fall-thru 98 case WebSocketFrameHeader::kOpCodeBinary: // fall-thru 99 case WebSocketFrameHeader::kOpCodeContinuation: 100 *name = "Data frame"; 101 break; 102 103 case WebSocketFrameHeader::kOpCodePing: 104 *name = "Ping"; 105 break; 106 107 case WebSocketFrameHeader::kOpCodePong: 108 *name = "Pong"; 109 break; 110 111 case WebSocketFrameHeader::kOpCodeClose: 112 *name = "Close"; 113 break; 114 115 default: 116 *name = "Unknown frame type"; 117 break; 118 } 119 120 return; 121 } 122 123 } // namespace 124 125 // A class to encapsulate a set of frames and information about the size of 126 // those frames. 127 class WebSocketChannel::SendBuffer { 128 public: 129 SendBuffer() : total_bytes_(0) {} 130 131 // Add a WebSocketFrame to the buffer and increase total_bytes_. 132 void AddFrame(scoped_ptr<WebSocketFrame> chunk); 133 134 // Return a pointer to the frames_ for write purposes. 135 ScopedVector<WebSocketFrame>* frames() { return &frames_; } 136 137 private: 138 // The frames_ that will be sent in the next call to WriteFrames(). 139 ScopedVector<WebSocketFrame> frames_; 140 141 // The total size of the payload data in |frames_|. This will be used to 142 // measure the throughput of the link. 143 // TODO(ricea): Measure the throughput of the link. 144 size_t total_bytes_; 145 }; 146 147 void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) { 148 total_bytes_ += frame->header.payload_length; 149 frames_.push_back(frame.release()); 150 } 151 152 // Implementation of WebSocketStream::ConnectDelegate that simply forwards the 153 // calls on to the WebSocketChannel that created it. 154 class WebSocketChannel::ConnectDelegate 155 : public WebSocketStream::ConnectDelegate { 156 public: 157 explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {} 158 159 virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE { 160 creator_->OnConnectSuccess(stream.Pass()); 161 // |this| may have been deleted. 162 } 163 164 virtual void OnFailure(const std::string& message) OVERRIDE { 165 creator_->OnConnectFailure(message); 166 // |this| has been deleted. 167 } 168 169 virtual void OnStartOpeningHandshake( 170 scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE { 171 creator_->OnStartOpeningHandshake(request.Pass()); 172 } 173 174 virtual void OnFinishOpeningHandshake( 175 scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE { 176 creator_->OnFinishOpeningHandshake(response.Pass()); 177 } 178 179 virtual void OnSSLCertificateError( 180 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> 181 ssl_error_callbacks, 182 const SSLInfo& ssl_info, 183 bool fatal) OVERRIDE { 184 creator_->OnSSLCertificateError( 185 ssl_error_callbacks.Pass(), ssl_info, fatal); 186 } 187 188 private: 189 // A pointer to the WebSocketChannel that created this object. There is no 190 // danger of this pointer being stale, because deleting the WebSocketChannel 191 // cancels the connect process, deleting this object and preventing its 192 // callbacks from being called. 193 WebSocketChannel* const creator_; 194 195 DISALLOW_COPY_AND_ASSIGN(ConnectDelegate); 196 }; 197 198 class WebSocketChannel::HandshakeNotificationSender 199 : public base::SupportsWeakPtr<HandshakeNotificationSender> { 200 public: 201 explicit HandshakeNotificationSender(WebSocketChannel* channel); 202 ~HandshakeNotificationSender(); 203 204 static void Send(base::WeakPtr<HandshakeNotificationSender> sender); 205 206 ChannelState SendImmediately(WebSocketEventInterface* event_interface); 207 208 const WebSocketHandshakeRequestInfo* handshake_request_info() const { 209 return handshake_request_info_.get(); 210 } 211 212 void set_handshake_request_info( 213 scoped_ptr<WebSocketHandshakeRequestInfo> request_info) { 214 handshake_request_info_ = request_info.Pass(); 215 } 216 217 const WebSocketHandshakeResponseInfo* handshake_response_info() const { 218 return handshake_response_info_.get(); 219 } 220 221 void set_handshake_response_info( 222 scoped_ptr<WebSocketHandshakeResponseInfo> response_info) { 223 handshake_response_info_ = response_info.Pass(); 224 } 225 226 private: 227 WebSocketChannel* owner_; 228 scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_; 229 scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_; 230 }; 231 232 WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender( 233 WebSocketChannel* channel) 234 : owner_(channel) {} 235 236 WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {} 237 238 void WebSocketChannel::HandshakeNotificationSender::Send( 239 base::WeakPtr<HandshakeNotificationSender> sender) { 240 // Do nothing if |sender| is already destructed. 241 if (sender) { 242 WebSocketChannel* channel = sender->owner_; 243 AllowUnused(sender->SendImmediately(channel->event_interface_.get())); 244 } 245 } 246 247 ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately( 248 WebSocketEventInterface* event_interface) { 249 250 if (handshake_request_info_.get()) { 251 if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake( 252 handshake_request_info_.Pass())) 253 return CHANNEL_DELETED; 254 } 255 256 if (handshake_response_info_.get()) { 257 if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake( 258 handshake_response_info_.Pass())) 259 return CHANNEL_DELETED; 260 261 // TODO(yhirano): We can release |this| to save memory because 262 // there will be no more opening handshake notification. 263 } 264 265 return CHANNEL_ALIVE; 266 } 267 268 WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame( 269 bool final, 270 WebSocketFrameHeader::OpCode opcode, 271 const scoped_refptr<IOBuffer>& data, 272 size_t offset, 273 size_t size) 274 : final_(final), 275 opcode_(opcode), 276 data_(data), 277 offset_(offset), 278 size_(size) {} 279 280 WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {} 281 282 void WebSocketChannel::PendingReceivedFrame::ResetOpcode() { 283 DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_)); 284 opcode_ = WebSocketFrameHeader::kOpCodeContinuation; 285 } 286 287 void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) { 288 DCHECK_LE(offset_, size_); 289 DCHECK_LE(bytes, size_ - offset_); 290 offset_ += bytes; 291 } 292 293 WebSocketChannel::WebSocketChannel( 294 scoped_ptr<WebSocketEventInterface> event_interface, 295 URLRequestContext* url_request_context) 296 : event_interface_(event_interface.Pass()), 297 url_request_context_(url_request_context), 298 send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark), 299 send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark), 300 current_send_quota_(0), 301 current_receive_quota_(0), 302 timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)), 303 received_close_code_(0), 304 state_(FRESHLY_CONSTRUCTED), 305 notification_sender_(new HandshakeNotificationSender(this)), 306 sending_text_message_(false), 307 receiving_text_message_(false), 308 expecting_to_handle_continuation_(false), 309 initial_frame_forwarded_(false) {} 310 311 WebSocketChannel::~WebSocketChannel() { 312 // The stream may hold a pointer to read_frames_, and so it needs to be 313 // destroyed first. 314 stream_.reset(); 315 // The timer may have a callback pointing back to us, so stop it just in case 316 // someone decides to run the event loop from their destructor. 317 timer_.Stop(); 318 } 319 320 void WebSocketChannel::SendAddChannelRequest( 321 const GURL& socket_url, 322 const std::vector<std::string>& requested_subprotocols, 323 const url::Origin& origin) { 324 // Delegate to the tested version. 325 SendAddChannelRequestWithSuppliedCreator( 326 socket_url, 327 requested_subprotocols, 328 origin, 329 base::Bind(&WebSocketStream::CreateAndConnectStream)); 330 } 331 332 void WebSocketChannel::SetState(State new_state) { 333 DCHECK_NE(state_, new_state); 334 335 if (new_state == CONNECTED) 336 established_on_ = base::TimeTicks::Now(); 337 if (state_ == CONNECTED && !established_on_.is_null()) { 338 UMA_HISTOGRAM_LONG_TIMES( 339 "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_); 340 } 341 342 state_ = new_state; 343 } 344 345 bool WebSocketChannel::InClosingState() const { 346 // The state RECV_CLOSED is not supported here, because it is only used in one 347 // code path and should not leak into the code in general. 348 DCHECK_NE(RECV_CLOSED, state_) 349 << "InClosingState called with state_ == RECV_CLOSED"; 350 return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED; 351 } 352 353 void WebSocketChannel::SendFrame(bool fin, 354 WebSocketFrameHeader::OpCode op_code, 355 const std::vector<char>& data) { 356 if (data.size() > INT_MAX) { 357 NOTREACHED() << "Frame size sanity check failed"; 358 return; 359 } 360 if (stream_ == NULL) { 361 LOG(DFATAL) << "Got SendFrame without a connection established; " 362 << "misbehaving renderer? fin=" << fin << " op_code=" << op_code 363 << " data.size()=" << data.size(); 364 return; 365 } 366 if (InClosingState()) { 367 DVLOG(1) << "SendFrame called in state " << state_ 368 << ". This may be a bug, or a harmless race."; 369 return; 370 } 371 if (state_ != CONNECTED) { 372 NOTREACHED() << "SendFrame() called in state " << state_; 373 return; 374 } 375 if (data.size() > base::checked_cast<size_t>(current_send_quota_)) { 376 // TODO(ricea): Kill renderer. 377 AllowUnused( 378 FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, "")); 379 // |this| has been deleted. 380 return; 381 } 382 if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) { 383 LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code 384 << "; misbehaving renderer? fin=" << fin 385 << " data.size()=" << data.size(); 386 return; 387 } 388 if (op_code == WebSocketFrameHeader::kOpCodeText || 389 (op_code == WebSocketFrameHeader::kOpCodeContinuation && 390 sending_text_message_)) { 391 StreamingUtf8Validator::State state = 392 outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size()); 393 if (state == StreamingUtf8Validator::INVALID || 394 (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) { 395 // TODO(ricea): Kill renderer. 396 AllowUnused( 397 FailChannel("Browser sent a text frame containing invalid UTF-8", 398 kWebSocketErrorGoingAway, 399 "")); 400 // |this| has been deleted. 401 return; 402 } 403 sending_text_message_ = !fin; 404 DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT); 405 } 406 current_send_quota_ -= data.size(); 407 // TODO(ricea): If current_send_quota_ has dropped below 408 // send_quota_low_water_mark_, it might be good to increase the "low 409 // water mark" and "high water mark", but only if the link to the WebSocket 410 // server is not saturated. 411 scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size())); 412 std::copy(data.begin(), data.end(), buffer->data()); 413 AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size())); 414 // |this| may have been deleted. 415 } 416 417 void WebSocketChannel::SendFlowControl(int64 quota) { 418 DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED || 419 state_ == CLOSE_WAIT); 420 // TODO(ricea): Kill the renderer if it tries to send us a negative quota 421 // value or > INT_MAX. 422 DCHECK_GE(quota, 0); 423 DCHECK_LE(quota, INT_MAX); 424 if (!pending_received_frames_.empty()) { 425 DCHECK_EQ(0, current_receive_quota_); 426 } 427 while (!pending_received_frames_.empty() && quota > 0) { 428 PendingReceivedFrame& front = pending_received_frames_.front(); 429 const size_t data_size = front.size() - front.offset(); 430 const size_t bytes_to_send = 431 std::min(base::checked_cast<size_t>(quota), data_size); 432 const bool final = front.final() && data_size == bytes_to_send; 433 const char* data = 434 front.data().get() ? front.data()->data() + front.offset() : NULL; 435 DCHECK(!bytes_to_send || data) << "Non empty data should not be null."; 436 const std::vector<char> data_vector(data, data + bytes_to_send); 437 DVLOG(3) << "Sending frame previously split due to quota to the " 438 << "renderer: quota=" << quota << " data_size=" << data_size 439 << " bytes_to_send=" << bytes_to_send; 440 if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) == 441 CHANNEL_DELETED) 442 return; 443 if (bytes_to_send < data_size) { 444 front.DidConsume(bytes_to_send); 445 front.ResetOpcode(); 446 return; 447 } 448 const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send); 449 DCHECK_GE(quota, signed_bytes_to_send); 450 quota -= signed_bytes_to_send; 451 452 pending_received_frames_.pop(); 453 } 454 // If current_receive_quota_ == 0 then there is no pending ReadFrames() 455 // operation. 456 const bool start_read = 457 current_receive_quota_ == 0 && quota > 0 && 458 (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT); 459 current_receive_quota_ += base::checked_cast<int>(quota); 460 if (start_read) 461 AllowUnused(ReadFrames()); 462 // |this| may have been deleted. 463 } 464 465 void WebSocketChannel::StartClosingHandshake(uint16 code, 466 const std::string& reason) { 467 if (InClosingState()) { 468 // When the associated renderer process is killed while the channel is in 469 // CLOSING state we reach here. 470 DVLOG(1) << "StartClosingHandshake called in state " << state_ 471 << ". This may be a bug, or a harmless race."; 472 return; 473 } 474 if (state_ == CONNECTING) { 475 // Abort the in-progress handshake and drop the connection immediately. 476 stream_request_.reset(); 477 SetState(CLOSED); 478 AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, "")); 479 return; 480 } 481 if (state_ != CONNECTED) { 482 NOTREACHED() << "StartClosingHandshake() called in state " << state_; 483 return; 484 } 485 // Javascript actually only permits 1000 and 3000-4999, but the implementation 486 // itself may produce different codes. The length of |reason| is also checked 487 // by Javascript. 488 if (!IsStrictlyValidCloseStatusCode(code) || 489 reason.size() > kMaximumCloseReasonLength) { 490 // "InternalServerError" is actually used for errors from any endpoint, per 491 // errata 3227 to RFC6455. If the renderer is sending us an invalid code or 492 // reason it must be malfunctioning in some way, and based on that we 493 // interpret this as an internal error. 494 if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) { 495 DCHECK_EQ(CONNECTED, state_); 496 SetState(SEND_CLOSED); 497 } 498 return; 499 } 500 if (SendClose( 501 code, 502 StreamingUtf8Validator::Validate(reason) ? reason : std::string()) == 503 CHANNEL_DELETED) 504 return; 505 DCHECK_EQ(CONNECTED, state_); 506 SetState(SEND_CLOSED); 507 } 508 509 void WebSocketChannel::SendAddChannelRequestForTesting( 510 const GURL& socket_url, 511 const std::vector<std::string>& requested_subprotocols, 512 const url::Origin& origin, 513 const WebSocketStreamCreator& creator) { 514 SendAddChannelRequestWithSuppliedCreator( 515 socket_url, requested_subprotocols, origin, creator); 516 } 517 518 void WebSocketChannel::SetClosingHandshakeTimeoutForTesting( 519 base::TimeDelta delay) { 520 timeout_ = delay; 521 } 522 523 void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator( 524 const GURL& socket_url, 525 const std::vector<std::string>& requested_subprotocols, 526 const url::Origin& origin, 527 const WebSocketStreamCreator& creator) { 528 DCHECK_EQ(FRESHLY_CONSTRUCTED, state_); 529 if (!socket_url.SchemeIsWSOrWSS()) { 530 // TODO(ricea): Kill the renderer (this error should have been caught by 531 // Javascript). 532 AllowUnused(event_interface_->OnAddChannelResponse(true, "", "")); 533 // |this| is deleted here. 534 return; 535 } 536 socket_url_ = socket_url; 537 scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate( 538 new ConnectDelegate(this)); 539 stream_request_ = creator.Run(socket_url_, 540 requested_subprotocols, 541 origin, 542 url_request_context_, 543 BoundNetLog(), 544 connect_delegate.Pass()); 545 SetState(CONNECTING); 546 } 547 548 void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) { 549 DCHECK(stream); 550 DCHECK_EQ(CONNECTING, state_); 551 552 stream_ = stream.Pass(); 553 554 SetState(CONNECTED); 555 556 if (event_interface_->OnAddChannelResponse( 557 false, stream_->GetSubProtocol(), stream_->GetExtensions()) == 558 CHANNEL_DELETED) 559 return; 560 561 // TODO(ricea): Get flow control information from the WebSocketStream once we 562 // have a multiplexing WebSocketStream. 563 current_send_quota_ = send_quota_high_water_mark_; 564 if (event_interface_->OnFlowControl(send_quota_high_water_mark_) == 565 CHANNEL_DELETED) 566 return; 567 568 // |stream_request_| is not used once the connection has succeeded. 569 stream_request_.reset(); 570 571 AllowUnused(ReadFrames()); 572 // |this| may have been deleted. 573 } 574 575 void WebSocketChannel::OnConnectFailure(const std::string& message) { 576 DCHECK_EQ(CONNECTING, state_); 577 578 // Copy the message before we delete its owner. 579 std::string message_copy = message; 580 581 SetState(CLOSED); 582 stream_request_.reset(); 583 584 if (CHANNEL_DELETED == 585 notification_sender_->SendImmediately(event_interface_.get())) { 586 // |this| has been deleted. 587 return; 588 } 589 AllowUnused(event_interface_->OnFailChannel(message_copy)); 590 // |this| has been deleted. 591 } 592 593 void WebSocketChannel::OnSSLCertificateError( 594 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks, 595 const SSLInfo& ssl_info, 596 bool fatal) { 597 AllowUnused(event_interface_->OnSSLCertificateError( 598 ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal)); 599 } 600 601 void WebSocketChannel::OnStartOpeningHandshake( 602 scoped_ptr<WebSocketHandshakeRequestInfo> request) { 603 DCHECK(!notification_sender_->handshake_request_info()); 604 605 // Because it is hard to handle an IPC error synchronously is difficult, 606 // we asynchronously notify the information. 607 notification_sender_->set_handshake_request_info(request.Pass()); 608 ScheduleOpeningHandshakeNotification(); 609 } 610 611 void WebSocketChannel::OnFinishOpeningHandshake( 612 scoped_ptr<WebSocketHandshakeResponseInfo> response) { 613 DCHECK(!notification_sender_->handshake_response_info()); 614 615 // Because it is hard to handle an IPC error synchronously is difficult, 616 // we asynchronously notify the information. 617 notification_sender_->set_handshake_response_info(response.Pass()); 618 ScheduleOpeningHandshakeNotification(); 619 } 620 621 void WebSocketChannel::ScheduleOpeningHandshakeNotification() { 622 base::MessageLoop::current()->PostTask( 623 FROM_HERE, 624 base::Bind(HandshakeNotificationSender::Send, 625 notification_sender_->AsWeakPtr())); 626 } 627 628 ChannelState WebSocketChannel::WriteFrames() { 629 int result = OK; 630 do { 631 // This use of base::Unretained is safe because this object owns the 632 // WebSocketStream and destroying it cancels all callbacks. 633 result = stream_->WriteFrames( 634 data_being_sent_->frames(), 635 base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone), 636 base::Unretained(this), 637 false)); 638 if (result != ERR_IO_PENDING) { 639 if (OnWriteDone(true, result) == CHANNEL_DELETED) 640 return CHANNEL_DELETED; 641 // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is 642 // guaranteed to be the same as before OnWriteDone() call. 643 } 644 } while (result == OK && data_being_sent_); 645 return CHANNEL_ALIVE; 646 } 647 648 ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) { 649 DCHECK_NE(FRESHLY_CONSTRUCTED, state_); 650 DCHECK_NE(CONNECTING, state_); 651 DCHECK_NE(ERR_IO_PENDING, result); 652 DCHECK(data_being_sent_); 653 switch (result) { 654 case OK: 655 if (data_to_send_next_) { 656 data_being_sent_ = data_to_send_next_.Pass(); 657 if (!synchronous) 658 return WriteFrames(); 659 } else { 660 data_being_sent_.reset(); 661 if (current_send_quota_ < send_quota_low_water_mark_) { 662 // TODO(ricea): Increase low_water_mark and high_water_mark if 663 // throughput is high, reduce them if throughput is low. Low water 664 // mark needs to be >= the bandwidth delay product *of the IPC 665 // channel*. Because factors like context-switch time, thread wake-up 666 // time, and bus speed come into play it is complex and probably needs 667 // to be determined empirically. 668 DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_); 669 // TODO(ricea): Truncate quota by the quota specified by the remote 670 // server, if the protocol in use supports quota. 671 int fresh_quota = send_quota_high_water_mark_ - current_send_quota_; 672 current_send_quota_ += fresh_quota; 673 return event_interface_->OnFlowControl(fresh_quota); 674 } 675 } 676 return CHANNEL_ALIVE; 677 678 // If a recoverable error condition existed, it would go here. 679 680 default: 681 DCHECK_LT(result, 0) 682 << "WriteFrames() should only return OK or ERR_ codes"; 683 684 stream_->Close(); 685 SetState(CLOSED); 686 return DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""); 687 } 688 } 689 690 ChannelState WebSocketChannel::ReadFrames() { 691 int result = OK; 692 while (result == OK && current_receive_quota_ > 0) { 693 // This use of base::Unretained is safe because this object owns the 694 // WebSocketStream, and any pending reads will be cancelled when it is 695 // destroyed. 696 result = stream_->ReadFrames( 697 &read_frames_, 698 base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone), 699 base::Unretained(this), 700 false)); 701 if (result != ERR_IO_PENDING) { 702 if (OnReadDone(true, result) == CHANNEL_DELETED) 703 return CHANNEL_DELETED; 704 } 705 DCHECK_NE(CLOSED, state_); 706 } 707 return CHANNEL_ALIVE; 708 } 709 710 ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) { 711 DCHECK_NE(FRESHLY_CONSTRUCTED, state_); 712 DCHECK_NE(CONNECTING, state_); 713 DCHECK_NE(ERR_IO_PENDING, result); 714 switch (result) { 715 case OK: 716 // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection 717 // with no data read, not an empty response. 718 DCHECK(!read_frames_.empty()) 719 << "ReadFrames() returned OK, but nothing was read."; 720 for (size_t i = 0; i < read_frames_.size(); ++i) { 721 scoped_ptr<WebSocketFrame> frame(read_frames_[i]); 722 read_frames_[i] = NULL; 723 if (HandleFrame(frame.Pass()) == CHANNEL_DELETED) 724 return CHANNEL_DELETED; 725 } 726 read_frames_.clear(); 727 // There should always be a call to ReadFrames pending. 728 // TODO(ricea): Unless we are out of quota. 729 DCHECK_NE(CLOSED, state_); 730 if (!synchronous) 731 return ReadFrames(); 732 return CHANNEL_ALIVE; 733 734 case ERR_WS_PROTOCOL_ERROR: 735 // This could be kWebSocketErrorProtocolError (specifically, non-minimal 736 // encoding of payload length) or kWebSocketErrorMessageTooBig, or an 737 // extension-specific error. 738 return FailChannel("Invalid frame header", 739 kWebSocketErrorProtocolError, 740 "WebSocket Protocol Error"); 741 742 default: 743 DCHECK_LT(result, 0) 744 << "ReadFrames() should only return OK or ERR_ codes"; 745 746 stream_->Close(); 747 SetState(CLOSED); 748 749 uint16 code = kWebSocketErrorAbnormalClosure; 750 std::string reason = ""; 751 bool was_clean = false; 752 if (received_close_code_ != 0) { 753 code = received_close_code_; 754 reason = received_close_reason_; 755 was_clean = (result == ERR_CONNECTION_CLOSED); 756 } 757 758 return DoDropChannel(was_clean, code, reason); 759 } 760 } 761 762 ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) { 763 if (frame->header.masked) { 764 // RFC6455 Section 5.1 "A client MUST close a connection if it detects a 765 // masked frame." 766 return FailChannel( 767 "A server must not mask any frames that it sends to the " 768 "client.", 769 kWebSocketErrorProtocolError, 770 "Masked frame from server"); 771 } 772 const WebSocketFrameHeader::OpCode opcode = frame->header.opcode; 773 DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) || 774 frame->header.final); 775 if (frame->header.reserved1 || frame->header.reserved2 || 776 frame->header.reserved3) { 777 return FailChannel(base::StringPrintf( 778 "One or more reserved bits are on: reserved1 = %d, " 779 "reserved2 = %d, reserved3 = %d", 780 static_cast<int>(frame->header.reserved1), 781 static_cast<int>(frame->header.reserved2), 782 static_cast<int>(frame->header.reserved3)), 783 kWebSocketErrorProtocolError, 784 "Invalid reserved bit"); 785 } 786 787 // Respond to the frame appropriately to its type. 788 return HandleFrameByState( 789 opcode, frame->header.final, frame->data, frame->header.payload_length); 790 } 791 792 ChannelState WebSocketChannel::HandleFrameByState( 793 const WebSocketFrameHeader::OpCode opcode, 794 bool final, 795 const scoped_refptr<IOBuffer>& data_buffer, 796 size_t size) { 797 DCHECK_NE(RECV_CLOSED, state_) 798 << "HandleFrame() does not support being called re-entrantly from within " 799 "SendClose()"; 800 DCHECK_NE(CLOSED, state_); 801 if (state_ == CLOSE_WAIT) { 802 std::string frame_name; 803 GetFrameTypeForOpcode(opcode, &frame_name); 804 805 // FailChannel() won't send another Close frame. 806 return FailChannel( 807 frame_name + " received after close", kWebSocketErrorProtocolError, ""); 808 } 809 switch (opcode) { 810 case WebSocketFrameHeader::kOpCodeText: // fall-thru 811 case WebSocketFrameHeader::kOpCodeBinary: 812 case WebSocketFrameHeader::kOpCodeContinuation: 813 return HandleDataFrame(opcode, final, data_buffer, size); 814 815 case WebSocketFrameHeader::kOpCodePing: 816 DVLOG(1) << "Got Ping of size " << size; 817 if (state_ == CONNECTED) 818 return SendFrameFromIOBuffer( 819 true, WebSocketFrameHeader::kOpCodePong, data_buffer, size); 820 DVLOG(3) << "Ignored ping in state " << state_; 821 return CHANNEL_ALIVE; 822 823 case WebSocketFrameHeader::kOpCodePong: 824 DVLOG(1) << "Got Pong of size " << size; 825 // There is no need to do anything with pong messages. 826 return CHANNEL_ALIVE; 827 828 case WebSocketFrameHeader::kOpCodeClose: { 829 // TODO(ricea): If there is a message which is queued for transmission to 830 // the renderer, then the renderer should not receive an 831 // OnClosingHandshake or OnDropChannel IPC until the queued message has 832 // been completedly transmitted. 833 uint16 code = kWebSocketNormalClosure; 834 std::string reason; 835 std::string message; 836 if (!ParseClose(data_buffer, size, &code, &reason, &message)) { 837 return FailChannel(message, code, reason); 838 } 839 // TODO(ricea): Find a way to safely log the message from the close 840 // message (escape control codes and so on). 841 DVLOG(1) << "Got Close with code " << code; 842 switch (state_) { 843 case CONNECTED: 844 SetState(RECV_CLOSED); 845 if (SendClose(code, reason) == CHANNEL_DELETED) 846 return CHANNEL_DELETED; 847 DCHECK_EQ(RECV_CLOSED, state_); 848 SetState(CLOSE_WAIT); 849 850 if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED) 851 return CHANNEL_DELETED; 852 received_close_code_ = code; 853 received_close_reason_ = reason; 854 break; 855 856 case SEND_CLOSED: 857 SetState(CLOSE_WAIT); 858 // From RFC6455 section 7.1.5: "Each endpoint 859 // will see the status code sent by the other end as _The WebSocket 860 // Connection Close Code_." 861 received_close_code_ = code; 862 received_close_reason_ = reason; 863 break; 864 865 default: 866 LOG(DFATAL) << "Got Close in unexpected state " << state_; 867 break; 868 } 869 return CHANNEL_ALIVE; 870 } 871 872 default: 873 return FailChannel( 874 base::StringPrintf("Unrecognized frame opcode: %d", opcode), 875 kWebSocketErrorProtocolError, 876 "Unknown opcode"); 877 } 878 } 879 880 ChannelState WebSocketChannel::HandleDataFrame( 881 WebSocketFrameHeader::OpCode opcode, 882 bool final, 883 const scoped_refptr<IOBuffer>& data_buffer, 884 size_t size) { 885 if (state_ != CONNECTED) { 886 DVLOG(3) << "Ignored data packet received in state " << state_; 887 return CHANNEL_ALIVE; 888 } 889 DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation || 890 opcode == WebSocketFrameHeader::kOpCodeText || 891 opcode == WebSocketFrameHeader::kOpCodeBinary); 892 const bool got_continuation = 893 (opcode == WebSocketFrameHeader::kOpCodeContinuation); 894 if (got_continuation != expecting_to_handle_continuation_) { 895 const std::string console_log = got_continuation 896 ? "Received unexpected continuation frame." 897 : "Received start of new message but previous message is unfinished."; 898 const std::string reason = got_continuation 899 ? "Unexpected continuation" 900 : "Previous data frame unfinished"; 901 return FailChannel(console_log, kWebSocketErrorProtocolError, reason); 902 } 903 expecting_to_handle_continuation_ = !final; 904 WebSocketFrameHeader::OpCode opcode_to_send = opcode; 905 if (!initial_frame_forwarded_ && 906 opcode == WebSocketFrameHeader::kOpCodeContinuation) { 907 opcode_to_send = receiving_text_message_ 908 ? WebSocketFrameHeader::kOpCodeText 909 : WebSocketFrameHeader::kOpCodeBinary; 910 } 911 if (opcode == WebSocketFrameHeader::kOpCodeText || 912 (opcode == WebSocketFrameHeader::kOpCodeContinuation && 913 receiving_text_message_)) { 914 // This call is not redundant when size == 0 because it tells us what 915 // the current state is. 916 StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes( 917 size ? data_buffer->data() : NULL, size); 918 if (state == StreamingUtf8Validator::INVALID || 919 (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) { 920 return FailChannel("Could not decode a text frame as UTF-8.", 921 kWebSocketErrorProtocolError, 922 "Invalid UTF-8 in text frame"); 923 } 924 receiving_text_message_ = !final; 925 DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT); 926 } 927 if (size == 0U && !final) 928 return CHANNEL_ALIVE; 929 930 initial_frame_forwarded_ = !final; 931 if (size > base::checked_cast<size_t>(current_receive_quota_) || 932 !pending_received_frames_.empty()) { 933 const bool no_quota = (current_receive_quota_ == 0); 934 DCHECK(no_quota || pending_received_frames_.empty()); 935 DVLOG(3) << "Queueing frame to renderer due to quota. quota=" 936 << current_receive_quota_ << " size=" << size; 937 WebSocketFrameHeader::OpCode opcode_to_queue = 938 no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation; 939 pending_received_frames_.push(PendingReceivedFrame( 940 final, opcode_to_queue, data_buffer, current_receive_quota_, size)); 941 if (no_quota) 942 return CHANNEL_ALIVE; 943 size = current_receive_quota_; 944 final = false; 945 } 946 947 // TODO(ricea): Can this copy be eliminated? 948 const char* const data_begin = size ? data_buffer->data() : NULL; 949 const char* const data_end = data_begin + size; 950 const std::vector<char> data(data_begin, data_end); 951 current_receive_quota_ -= size; 952 DCHECK_GE(current_receive_quota_, 0); 953 954 // Sends the received frame to the renderer process. 955 return event_interface_->OnDataFrame(final, opcode_to_send, data); 956 } 957 958 ChannelState WebSocketChannel::SendFrameFromIOBuffer( 959 bool fin, 960 WebSocketFrameHeader::OpCode op_code, 961 const scoped_refptr<IOBuffer>& buffer, 962 size_t size) { 963 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED); 964 DCHECK(stream_); 965 966 scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code)); 967 WebSocketFrameHeader& header = frame->header; 968 header.final = fin; 969 header.masked = true; 970 header.payload_length = size; 971 frame->data = buffer; 972 973 if (data_being_sent_) { 974 // Either the link to the WebSocket server is saturated, or several messages 975 // are being sent in a batch. 976 // TODO(ricea): Keep some statistics to work out the situation and adjust 977 // quota appropriately. 978 if (!data_to_send_next_) 979 data_to_send_next_.reset(new SendBuffer); 980 data_to_send_next_->AddFrame(frame.Pass()); 981 return CHANNEL_ALIVE; 982 } 983 984 data_being_sent_.reset(new SendBuffer); 985 data_being_sent_->AddFrame(frame.Pass()); 986 return WriteFrames(); 987 } 988 989 ChannelState WebSocketChannel::FailChannel(const std::string& message, 990 uint16 code, 991 const std::string& reason) { 992 DCHECK_NE(FRESHLY_CONSTRUCTED, state_); 993 DCHECK_NE(CONNECTING, state_); 994 DCHECK_NE(CLOSED, state_); 995 996 // TODO(ricea): Logging. 997 if (state_ == CONNECTED) { 998 if (SendClose(code, reason) == CHANNEL_DELETED) 999 return CHANNEL_DELETED; 1000 } 1001 1002 // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser 1003 // should close the connection itself without waiting for the closing 1004 // handshake. 1005 stream_->Close(); 1006 SetState(CLOSED); 1007 return event_interface_->OnFailChannel(message); 1008 } 1009 1010 ChannelState WebSocketChannel::SendClose(uint16 code, 1011 const std::string& reason) { 1012 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED); 1013 DCHECK_LE(reason.size(), kMaximumCloseReasonLength); 1014 scoped_refptr<IOBuffer> body; 1015 size_t size = 0; 1016 if (code == kWebSocketErrorNoStatusReceived) { 1017 // Special case: translate kWebSocketErrorNoStatusReceived into a Close 1018 // frame with no payload. 1019 DCHECK(reason.empty()); 1020 body = new IOBuffer(0); 1021 } else { 1022 const size_t payload_length = kWebSocketCloseCodeLength + reason.length(); 1023 body = new IOBuffer(payload_length); 1024 size = payload_length; 1025 base::WriteBigEndian(body->data(), code); 1026 COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength, 1027 they_should_both_be_two); 1028 std::copy( 1029 reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength); 1030 } 1031 // This use of base::Unretained() is safe because we stop the timer in the 1032 // destructor. 1033 timer_.Start( 1034 FROM_HERE, 1035 timeout_, 1036 base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this))); 1037 if (SendFrameFromIOBuffer( 1038 true, WebSocketFrameHeader::kOpCodeClose, body, size) == 1039 CHANNEL_DELETED) 1040 return CHANNEL_DELETED; 1041 return CHANNEL_ALIVE; 1042 } 1043 1044 bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer, 1045 size_t size, 1046 uint16* code, 1047 std::string* reason, 1048 std::string* message) { 1049 reason->clear(); 1050 if (size < kWebSocketCloseCodeLength) { 1051 if (size == 0U) { 1052 *code = kWebSocketErrorNoStatusReceived; 1053 return true; 1054 } 1055 1056 DVLOG(1) << "Close frame with payload size " << size << " received " 1057 << "(the first byte is " << std::hex 1058 << static_cast<int>(buffer->data()[0]) << ")"; 1059 *code = kWebSocketErrorProtocolError; 1060 *message = 1061 "Received a broken close frame containing an invalid size body."; 1062 return false; 1063 } 1064 1065 const char* data = buffer->data(); 1066 uint16 unchecked_code = 0; 1067 base::ReadBigEndian(data, &unchecked_code); 1068 COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength, 1069 they_should_both_be_two_bytes); 1070 1071 switch (unchecked_code) { 1072 case kWebSocketErrorNoStatusReceived: 1073 case kWebSocketErrorAbnormalClosure: 1074 case kWebSocketErrorTlsHandshake: 1075 *code = kWebSocketErrorProtocolError; 1076 *message = 1077 "Received a broken close frame containing a reserved status code."; 1078 return false; 1079 1080 default: 1081 *code = unchecked_code; 1082 break; 1083 } 1084 1085 std::string text(data + kWebSocketCloseCodeLength, data + size); 1086 if (StreamingUtf8Validator::Validate(text)) { 1087 reason->swap(text); 1088 return true; 1089 } 1090 1091 *code = kWebSocketErrorProtocolError; 1092 *reason = "Invalid UTF-8 in Close frame"; 1093 *message = "Received a broken close frame containing invalid UTF-8."; 1094 return false; 1095 } 1096 1097 ChannelState WebSocketChannel::DoDropChannel(bool was_clean, 1098 uint16 code, 1099 const std::string& reason) { 1100 if (CHANNEL_DELETED == 1101 notification_sender_->SendImmediately(event_interface_.get())) 1102 return CHANNEL_DELETED; 1103 ChannelState result = 1104 event_interface_->OnDropChannel(was_clean, code, reason); 1105 DCHECK_EQ(CHANNEL_DELETED, result); 1106 return result; 1107 } 1108 1109 void WebSocketChannel::CloseTimeout() { 1110 stream_->Close(); 1111 SetState(CLOSED); 1112 AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, "")); 1113 // |this| has been deleted. 1114 } 1115 1116 } // namespace net 1117