1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/websockets/websocket_job.h" 6 7 #include <algorithm> 8 9 #include "base/lazy_instance.h" 10 #include "base/string_tokenizer.h" 11 #include "googleurl/src/gurl.h" 12 #include "net/base/net_errors.h" 13 #include "net/base/net_log.h" 14 #include "net/base/cookie_policy.h" 15 #include "net/base/cookie_store.h" 16 #include "net/base/io_buffer.h" 17 #include "net/http/http_util.h" 18 #include "net/url_request/url_request_context.h" 19 #include "net/websockets/websocket_frame_handler.h" 20 #include "net/websockets/websocket_handshake_handler.h" 21 #include "net/websockets/websocket_net_log_params.h" 22 #include "net/websockets/websocket_throttle.h" 23 24 namespace { 25 26 // lower-case header names. 27 const char* const kCookieHeaders[] = { 28 "cookie", "cookie2" 29 }; 30 const char* const kSetCookieHeaders[] = { 31 "set-cookie", "set-cookie2" 32 }; 33 34 net::SocketStreamJob* WebSocketJobFactory( 35 const GURL& url, net::SocketStream::Delegate* delegate) { 36 net::WebSocketJob* job = new net::WebSocketJob(delegate); 37 job->InitSocketStream(new net::SocketStream(url, job)); 38 return job; 39 } 40 41 class WebSocketJobInitSingleton { 42 private: 43 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>; 44 WebSocketJobInitSingleton() { 45 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); 46 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); 47 } 48 }; 49 50 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init( 51 base::LINKER_INITIALIZED); 52 53 } // anonymous namespace 54 55 namespace net { 56 57 // static 58 void WebSocketJob::EnsureInit() { 59 g_websocket_job_init.Get(); 60 } 61 62 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) 63 : delegate_(delegate), 64 state_(INITIALIZED), 65 waiting_(false), 66 callback_(NULL), 67 handshake_request_(new WebSocketHandshakeRequestHandler), 68 handshake_response_(new WebSocketHandshakeResponseHandler), 69 handshake_request_sent_(0), 70 response_cookies_save_index_(0), 71 send_frame_handler_(new WebSocketFrameHandler), 72 receive_frame_handler_(new WebSocketFrameHandler) { 73 } 74 75 WebSocketJob::~WebSocketJob() { 76 DCHECK_EQ(CLOSED, state_); 77 DCHECK(!delegate_); 78 DCHECK(!socket_.get()); 79 } 80 81 void WebSocketJob::Connect() { 82 DCHECK(socket_.get()); 83 DCHECK_EQ(state_, INITIALIZED); 84 state_ = CONNECTING; 85 socket_->Connect(); 86 } 87 88 bool WebSocketJob::SendData(const char* data, int len) { 89 switch (state_) { 90 case INITIALIZED: 91 return false; 92 93 case CONNECTING: 94 return SendHandshakeRequest(data, len); 95 96 case OPEN: 97 { 98 send_frame_handler_->AppendData(data, len); 99 // If current buffer is sending now, this data will be sent in 100 // SendPending() after current data was sent. 101 // Do not buffer sending data for now. Since 102 // WebCore::SocketStreamHandle controls traffic to keep number of 103 // pending bytes less than max_pending_send_allowed, so when sending 104 // larger message than max_pending_send_allowed should not be buffered. 105 // If we don't call OnSentData, WebCore::SocketStreamHandle would stop 106 // sending more data when pending data reaches max_pending_send_allowed. 107 // TODO(ukai): Fix this to support compression for larger message. 108 int err = 0; 109 if (!send_frame_handler_->GetCurrentBuffer() && 110 (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) { 111 DCHECK(!current_buffer_); 112 current_buffer_ = new DrainableIOBuffer( 113 send_frame_handler_->GetCurrentBuffer(), 114 send_frame_handler_->GetCurrentBufferSize()); 115 return socket_->SendData( 116 current_buffer_->data(), current_buffer_->BytesRemaining()); 117 } 118 return err >= 0; 119 } 120 121 case CLOSING: 122 case CLOSED: 123 return false; 124 } 125 return false; 126 } 127 128 void WebSocketJob::Close() { 129 state_ = CLOSING; 130 if (current_buffer_) { 131 // Will close in SendPending. 132 return; 133 } 134 state_ = CLOSED; 135 socket_->Close(); 136 } 137 138 void WebSocketJob::RestartWithAuth( 139 const string16& username, 140 const string16& password) { 141 state_ = CONNECTING; 142 socket_->RestartWithAuth(username, password); 143 } 144 145 void WebSocketJob::DetachDelegate() { 146 state_ = CLOSED; 147 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 148 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 149 150 scoped_refptr<WebSocketJob> protect(this); 151 152 delegate_ = NULL; 153 if (socket_) 154 socket_->DetachDelegate(); 155 socket_ = NULL; 156 if (callback_) { 157 waiting_ = false; 158 callback_ = NULL; 159 Release(); // Balanced with OnStartOpenConnection(). 160 } 161 } 162 163 int WebSocketJob::OnStartOpenConnection( 164 SocketStream* socket, CompletionCallback* callback) { 165 DCHECK(!callback_); 166 state_ = CONNECTING; 167 addresses_.Copy(socket->address_list().head(), true); 168 WebSocketThrottle::GetInstance()->PutInQueue(this); 169 if (!waiting_) 170 return OK; 171 callback_ = callback; 172 AddRef(); // Balanced when callback_ becomes NULL. 173 return ERR_IO_PENDING; 174 } 175 176 void WebSocketJob::OnConnected( 177 SocketStream* socket, int max_pending_send_allowed) { 178 if (state_ == CLOSED) 179 return; 180 DCHECK_EQ(CONNECTING, state_); 181 if (delegate_) 182 delegate_->OnConnected(socket, max_pending_send_allowed); 183 } 184 185 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { 186 DCHECK_NE(INITIALIZED, state_); 187 if (state_ == CLOSED) 188 return; 189 if (state_ == CONNECTING) { 190 OnSentHandshakeRequest(socket, amount_sent); 191 return; 192 } 193 if (delegate_) { 194 DCHECK(state_ == OPEN || state_ == CLOSING); 195 DCHECK_GT(amount_sent, 0); 196 DCHECK(current_buffer_); 197 current_buffer_->DidConsume(amount_sent); 198 if (current_buffer_->BytesRemaining() > 0) 199 return; 200 201 // We need to report amount_sent of original buffer size, instead of 202 // amount sent to |socket|. 203 amount_sent = send_frame_handler_->GetOriginalBufferSize(); 204 DCHECK_GT(amount_sent, 0); 205 current_buffer_ = NULL; 206 send_frame_handler_->ReleaseCurrentBuffer(); 207 delegate_->OnSentData(socket, amount_sent); 208 MessageLoopForIO::current()->PostTask( 209 FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending)); 210 } 211 } 212 213 void WebSocketJob::OnReceivedData( 214 SocketStream* socket, const char* data, int len) { 215 DCHECK_NE(INITIALIZED, state_); 216 if (state_ == CLOSED) 217 return; 218 if (state_ == CONNECTING) { 219 OnReceivedHandshakeResponse(socket, data, len); 220 return; 221 } 222 DCHECK(state_ == OPEN || state_ == CLOSING); 223 std::string received_data; 224 receive_frame_handler_->AppendData(data, len); 225 // Don't buffer receiving data for now. 226 // TODO(ukai): fix performance of WebSocketFrameHandler. 227 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { 228 received_data += 229 std::string(receive_frame_handler_->GetCurrentBuffer()->data(), 230 receive_frame_handler_->GetCurrentBufferSize()); 231 receive_frame_handler_->ReleaseCurrentBuffer(); 232 } 233 if (delegate_ && !received_data.empty()) 234 delegate_->OnReceivedData( 235 socket, received_data.data(), received_data.size()); 236 } 237 238 void WebSocketJob::OnClose(SocketStream* socket) { 239 state_ = CLOSED; 240 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 241 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 242 243 scoped_refptr<WebSocketJob> protect(this); 244 245 SocketStream::Delegate* delegate = delegate_; 246 delegate_ = NULL; 247 socket_ = NULL; 248 if (callback_) { 249 waiting_ = false; 250 callback_ = NULL; 251 Release(); // Balanced with OnStartOpenConnection(). 252 } 253 if (delegate) 254 delegate->OnClose(socket); 255 } 256 257 void WebSocketJob::OnAuthRequired( 258 SocketStream* socket, AuthChallengeInfo* auth_info) { 259 if (delegate_) 260 delegate_->OnAuthRequired(socket, auth_info); 261 } 262 263 void WebSocketJob::OnError(const SocketStream* socket, int error) { 264 if (delegate_) 265 delegate_->OnError(socket, error); 266 } 267 268 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 269 DCHECK_EQ(state_, CONNECTING); 270 if (!handshake_request_->ParseRequest(data, len)) 271 return false; 272 273 // handshake message is completed. 274 AddCookieHeaderAndSend(); 275 // Just buffered in |handshake_request_|. 276 return true; 277 } 278 279 void WebSocketJob::AddCookieHeaderAndSend() { 280 int policy = OK; 281 if (socket_->context()->cookie_policy()) { 282 GURL url_for_cookies = GetURLForCookies(); 283 policy = socket_->context()->cookie_policy()->CanGetCookies( 284 url_for_cookies, 285 url_for_cookies); 286 } 287 DCHECK_NE(ERR_IO_PENDING, policy); 288 OnCanGetCookiesCompleted(policy); 289 } 290 291 void WebSocketJob::OnCanGetCookiesCompleted(int policy) { 292 if (socket_ && delegate_ && state_ == CONNECTING) { 293 handshake_request_->RemoveHeaders( 294 kCookieHeaders, arraysize(kCookieHeaders)); 295 if (policy == OK) { 296 // Add cookies, including HttpOnly cookies. 297 if (socket_->context()->cookie_store()) { 298 CookieOptions cookie_options; 299 cookie_options.set_include_httponly(); 300 std::string cookie = 301 socket_->context()->cookie_store()->GetCookiesWithOptions( 302 GetURLForCookies(), cookie_options); 303 if (!cookie.empty()) 304 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 305 } 306 } 307 308 const std::string& handshake_request = handshake_request_->GetRawRequest(); 309 handshake_request_sent_ = 0; 310 socket_->net_log()->AddEvent( 311 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 312 make_scoped_refptr( 313 new NetLogWebSocketHandshakeParameter(handshake_request))); 314 socket_->SendData(handshake_request.data(), 315 handshake_request.size()); 316 } 317 } 318 319 void WebSocketJob::OnSentHandshakeRequest( 320 SocketStream* socket, int amount_sent) { 321 DCHECK_EQ(state_, CONNECTING); 322 handshake_request_sent_ += amount_sent; 323 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 324 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 325 // handshake request has been sent. 326 // notify original size of handshake request to delegate. 327 if (delegate_) 328 delegate_->OnSentData( 329 socket, 330 handshake_request_->original_length()); 331 handshake_request_.reset(); 332 } 333 } 334 335 void WebSocketJob::OnReceivedHandshakeResponse( 336 SocketStream* socket, const char* data, int len) { 337 DCHECK_EQ(state_, CONNECTING); 338 if (handshake_response_->HasResponse()) { 339 // If we already has handshake response, received data should be frame 340 // data, not handshake message. 341 receive_frame_handler_->AppendData(data, len); 342 return; 343 } 344 345 size_t response_length = handshake_response_->ParseRawResponse(data, len); 346 if (!handshake_response_->HasResponse()) { 347 // not yet. we need more data. 348 return; 349 } 350 // handshake message is completed. 351 socket_->net_log()->AddEvent( 352 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS, 353 make_scoped_refptr(new NetLogWebSocketHandshakeParameter( 354 handshake_response_->GetRawResponse()))); 355 if (len - response_length > 0) { 356 // If we received extra data, it should be frame data. 357 receive_frame_handler_->AppendData(data + response_length, 358 len - response_length); 359 } 360 SaveCookiesAndNotifyHeaderComplete(); 361 } 362 363 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() { 364 // handshake message is completed. 365 DCHECK(handshake_response_->HasResponse()); 366 367 response_cookies_.clear(); 368 response_cookies_save_index_ = 0; 369 370 handshake_response_->GetHeaders( 371 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); 372 373 // Now, loop over the response cookies, and attempt to persist each. 374 SaveNextCookie(); 375 } 376 377 void WebSocketJob::SaveNextCookie() { 378 if (response_cookies_save_index_ == response_cookies_.size()) { 379 response_cookies_.clear(); 380 response_cookies_save_index_ = 0; 381 382 // Remove cookie headers, with malformed headers preserved. 383 // Actual handshake should be done in WebKit. 384 handshake_response_->RemoveHeaders( 385 kSetCookieHeaders, arraysize(kSetCookieHeaders)); 386 std::string received_data = handshake_response_->GetResponse(); 387 // Don't buffer receiving data for now. 388 // TODO(ukai): fix performance of WebSocketFrameHandler. 389 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { 390 received_data += 391 std::string(receive_frame_handler_->GetCurrentBuffer()->data(), 392 receive_frame_handler_->GetCurrentBufferSize()); 393 receive_frame_handler_->ReleaseCurrentBuffer(); 394 } 395 396 state_ = OPEN; 397 if (delegate_) 398 delegate_->OnReceivedData( 399 socket_, received_data.data(), received_data.size()); 400 401 handshake_response_.reset(); 402 403 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 404 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 405 return; 406 } 407 408 int policy = OK; 409 if (socket_->context()->cookie_policy()) { 410 GURL url_for_cookies = GetURLForCookies(); 411 policy = socket_->context()->cookie_policy()->CanSetCookie( 412 url_for_cookies, 413 url_for_cookies, 414 response_cookies_[response_cookies_save_index_]); 415 } 416 417 DCHECK_NE(ERR_IO_PENDING, policy); 418 OnCanSetCookieCompleted(policy); 419 } 420 421 void WebSocketJob::OnCanSetCookieCompleted(int policy) { 422 if (socket_ && delegate_ && state_ == CONNECTING) { 423 if ((policy == OK || policy == OK_FOR_SESSION_ONLY) && 424 socket_->context()->cookie_store()) { 425 CookieOptions options; 426 options.set_include_httponly(); 427 if (policy == OK_FOR_SESSION_ONLY) 428 options.set_force_session(); 429 GURL url_for_cookies = GetURLForCookies(); 430 socket_->context()->cookie_store()->SetCookieWithOptions( 431 url_for_cookies, response_cookies_[response_cookies_save_index_], 432 options); 433 } 434 response_cookies_save_index_++; 435 SaveNextCookie(); 436 } 437 } 438 439 GURL WebSocketJob::GetURLForCookies() const { 440 GURL url = socket_->url(); 441 std::string scheme = socket_->is_secure() ? "https" : "http"; 442 url_canon::Replacements<char> replacements; 443 replacements.SetScheme(scheme.c_str(), 444 url_parse::Component(0, scheme.length())); 445 return url.ReplaceComponents(replacements); 446 } 447 448 const AddressList& WebSocketJob::address_list() const { 449 return addresses_; 450 } 451 452 void WebSocketJob::SetWaiting() { 453 waiting_ = true; 454 } 455 456 bool WebSocketJob::IsWaiting() const { 457 return waiting_; 458 } 459 460 void WebSocketJob::Wakeup() { 461 if (!waiting_) 462 return; 463 waiting_ = false; 464 DCHECK(callback_); 465 MessageLoopForIO::current()->PostTask( 466 FROM_HERE, 467 NewRunnableMethod(this, 468 &WebSocketJob::DoCallback)); 469 } 470 471 void WebSocketJob::DoCallback() { 472 // |callback_| may be NULL if OnClose() or DetachDelegate() was called. 473 if (callback_) { 474 net::CompletionCallback* callback = callback_; 475 callback_ = NULL; 476 callback->Run(net::OK); 477 Release(); // Balanced with OnStartOpenConnection(). 478 } 479 } 480 481 void WebSocketJob::SendPending() { 482 if (current_buffer_) 483 return; 484 // Current buffer is done. Try next buffer if any. 485 // Don't buffer sending data. See comment on case OPEN in SendData(). 486 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { 487 // No more data to send. 488 if (state_ == CLOSING) 489 socket_->Close(); 490 return; 491 } 492 current_buffer_ = new DrainableIOBuffer( 493 send_frame_handler_->GetCurrentBuffer(), 494 send_frame_handler_->GetCurrentBufferSize()); 495 socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining()); 496 } 497 498 } // namespace net 499