1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/tools/flip_server/sm_connection.h" 6 7 #include <errno.h> 8 #include <netinet/tcp.h> 9 #include <sys/socket.h> 10 11 #include <list> 12 #include <string> 13 14 #include "net/tools/flip_server/constants.h" 15 #include "net/tools/flip_server/flip_config.h" 16 #include "net/tools/flip_server/http_interface.h" 17 #include "net/tools/flip_server/spdy_interface.h" 18 #include "net/tools/flip_server/spdy_ssl.h" 19 #include "net/tools/flip_server/streamer_interface.h" 20 21 namespace net { 22 23 // static 24 bool SMConnection::force_spdy_ = false; 25 26 SMConnection::SMConnection(EpollServer* epoll_server, 27 SSLState* ssl_state, 28 MemoryCache* memory_cache, 29 FlipAcceptor* acceptor, 30 std::string log_prefix) 31 : last_read_time_(0), 32 fd_(-1), 33 events_(0), 34 registered_in_epoll_server_(false), 35 initialized_(false), 36 protocol_detected_(false), 37 connection_complete_(false), 38 connection_pool_(NULL), 39 epoll_server_(epoll_server), 40 ssl_state_(ssl_state), 41 memory_cache_(memory_cache), 42 acceptor_(acceptor), 43 read_buffer_(kSpdySegmentSize * 40), 44 sm_spdy_interface_(NULL), 45 sm_http_interface_(NULL), 46 sm_streamer_interface_(NULL), 47 sm_interface_(NULL), 48 log_prefix_(log_prefix), 49 max_bytes_sent_per_dowrite_(4096), 50 ssl_(NULL) { 51 } 52 53 SMConnection::~SMConnection() { 54 if (initialized()) 55 Reset(); 56 } 57 58 EpollServer* SMConnection::epoll_server() { 59 return epoll_server_; 60 } 61 62 void SMConnection::ReadyToSend() { 63 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 64 << "Setting ready to send: EPOLLIN | EPOLLOUT"; 65 epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); 66 } 67 68 void SMConnection::EnqueueDataFrame(DataFrame* df) { 69 output_list_.push_back(df); 70 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " 71 << "size = " << df->size << ": Setting FD ready."; 72 ReadyToSend(); 73 } 74 75 void SMConnection::InitSMConnection(SMConnectionPoolInterface* connection_pool, 76 SMInterface* sm_interface, 77 EpollServer* epoll_server, 78 int fd, 79 std::string server_ip, 80 std::string server_port, 81 std::string remote_ip, 82 bool use_ssl) { 83 if (initialized_) { 84 LOG(FATAL) << "Attempted to initialize already initialized server"; 85 return; 86 } 87 88 client_ip_ = remote_ip; 89 90 if (fd == -1) { 91 // If fd == -1, then we are initializing a new connection that will 92 // connect to the backend. 93 // 94 // ret: -1 == error 95 // 0 == connection in progress 96 // 1 == connection complete 97 // TODO(kelindsay): is_numeric_host_address value needs to be detected 98 server_ip_ = server_ip; 99 server_port_ = server_port; 100 int ret = CreateConnectedSocket(&fd_, 101 server_ip, 102 server_port, 103 true, 104 acceptor_->disable_nagle_); 105 106 if (ret < 0) { 107 LOG(ERROR) << "-1 Could not create connected socket"; 108 return; 109 } else if (ret == 1) { 110 DCHECK_NE(-1, fd_); 111 connection_complete_ = true; 112 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 113 << "Connection complete to: " << server_ip_ << ":" 114 << server_port_ << " "; 115 } 116 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 117 << "Connecting to server: " << server_ip_ << ":" 118 << server_port_ << " "; 119 } else { 120 // If fd != -1 then we are initializing a connection that has just been 121 // accepted from the listen socket. 122 connection_complete_ = true; 123 if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { 124 epoll_server_->UnregisterFD(fd_); 125 } 126 if (fd_ != -1) { 127 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 128 << "Closing pre-existing fd"; 129 close(fd_); 130 fd_ = -1; 131 } 132 133 fd_ = fd; 134 } 135 136 registered_in_epoll_server_ = false; 137 // Set the last read time here as the idle checker will start from 138 // now. 139 last_read_time_ = time(NULL); 140 initialized_ = true; 141 142 connection_pool_ = connection_pool; 143 epoll_server_ = epoll_server; 144 145 if (sm_interface) { 146 sm_interface_ = sm_interface; 147 protocol_detected_ = true; 148 } 149 150 read_buffer_.Clear(); 151 152 epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET); 153 154 if (use_ssl) { 155 ssl_ = CreateSSLContext(ssl_state_->ssl_ctx); 156 SSL_set_fd(ssl_, fd_); 157 PrintSslError(); 158 } 159 } 160 161 void SMConnection::CorkSocket() { 162 int state = 1; 163 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); 164 if (rv < 0) 165 VLOG(1) << "setsockopt(CORK): " << errno; 166 } 167 168 void SMConnection::UncorkSocket() { 169 int state = 0; 170 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); 171 if (rv < 0) 172 VLOG(1) << "setsockopt(CORK): " << errno; 173 } 174 175 int SMConnection::Send(const char* data, int len, int flags) { 176 int rv = 0; 177 CorkSocket(); 178 if (ssl_) { 179 ssize_t bytes_written = 0; 180 // Write smallish chunks to SSL so that we don't have large 181 // multi-packet TLS records to receive before being able to handle 182 // the data. We don't have to be too careful here, because our data 183 // frames are already getting chunked appropriately, and those are 184 // the most likely "big" frames. 185 while (len > 0) { 186 const int kMaxTLSRecordSize = 1500; 187 const char* ptr = &(data[bytes_written]); 188 int chunksize = std::min(len, kMaxTLSRecordSize); 189 rv = SSL_write(ssl_, ptr, chunksize); 190 VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv; 191 if (rv <= 0) { 192 switch (SSL_get_error(ssl_, rv)) { 193 case SSL_ERROR_WANT_READ: 194 case SSL_ERROR_WANT_WRITE: 195 case SSL_ERROR_WANT_ACCEPT: 196 case SSL_ERROR_WANT_CONNECT: 197 rv = -2; 198 break; 199 default: 200 PrintSslError(); 201 break; 202 } 203 break; 204 } 205 bytes_written += rv; 206 len -= rv; 207 if (rv != chunksize) 208 break; // If we couldn't write everything, we're implicitly stalled 209 } 210 // If we wrote some data, return that count. Otherwise 211 // return the stall error. 212 if (bytes_written > 0) 213 rv = bytes_written; 214 } else { 215 rv = send(fd_, data, len, flags); 216 } 217 if (!(flags & MSG_MORE)) 218 UncorkSocket(); 219 return rv; 220 } 221 222 void SMConnection::OnRegistration(EpollServer* eps, int fd, int event_mask) { 223 registered_in_epoll_server_ = true; 224 } 225 226 void SMConnection::OnEvent(int fd, EpollEvent* event) { 227 events_ |= event->in_events; 228 HandleEvents(); 229 if (events_) { 230 event->out_ready_mask = events_; 231 events_ = 0; 232 } 233 } 234 235 void SMConnection::OnUnregistration(int fd, bool replaced) { 236 registered_in_epoll_server_ = false; 237 } 238 239 void SMConnection::OnShutdown(EpollServer* eps, int fd) { 240 Cleanup("OnShutdown"); 241 return; 242 } 243 244 void SMConnection::Cleanup(const char* cleanup) { 245 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup; 246 if (!initialized_) 247 return; 248 Reset(); 249 if (connection_pool_) 250 connection_pool_->SMConnectionDone(this); 251 if (sm_interface_) 252 sm_interface_->ResetForNewConnection(); 253 last_read_time_ = 0; 254 } 255 256 void SMConnection::HandleEvents() { 257 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " 258 << EpollServer::EventMaskToString(events_).c_str(); 259 260 if (events_ & EPOLLIN) { 261 if (!DoRead()) 262 goto handle_close_or_error; 263 } 264 265 if (events_ & EPOLLOUT) { 266 // Check if we have connected or not 267 if (connection_complete_ == false) { 268 int sock_error; 269 socklen_t sock_error_len = sizeof(sock_error); 270 int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, 271 &sock_error_len); 272 if (ret != 0) { 273 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 274 << "getsockopt error: " << errno << ": " << strerror(errno); 275 goto handle_close_or_error; 276 } 277 if (sock_error == 0) { 278 connection_complete_ = true; 279 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 280 << "Connection complete to " << server_ip_ << ":" 281 << server_port_ << " "; 282 } else if (sock_error == EINPROGRESS) { 283 return; 284 } else { 285 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 286 << "error connecting to server"; 287 goto handle_close_or_error; 288 } 289 } 290 if (!DoWrite()) 291 goto handle_close_or_error; 292 } 293 294 if (events_ & (EPOLLHUP | EPOLLERR)) { 295 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR"; 296 goto handle_close_or_error; 297 } 298 return; 299 300 handle_close_or_error: 301 Cleanup("HandleEvents"); 302 } 303 304 // Decide if SPDY was negotiated. 305 bool SMConnection::WasSpdyNegotiated() { 306 if (force_spdy()) 307 return true; 308 309 // If this is an SSL connection, check if NPN specifies SPDY. 310 if (ssl_) { 311 const unsigned char *npn_proto; 312 unsigned int npn_proto_len; 313 SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len); 314 if (npn_proto_len > 0) { 315 std::string npn_proto_str((const char *)npn_proto, npn_proto_len); 316 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 317 << "NPN protocol detected: " << npn_proto_str; 318 if (!strncmp(reinterpret_cast<const char*>(npn_proto), 319 "spdy/2", npn_proto_len)) 320 return true; 321 } 322 } 323 324 return false; 325 } 326 327 bool SMConnection::SetupProtocolInterfaces() { 328 DCHECK(!protocol_detected_); 329 protocol_detected_ = true; 330 331 bool spdy_negotiated = WasSpdyNegotiated(); 332 bool using_ssl = ssl_ != NULL; 333 334 if (using_ssl) 335 VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated") 336 << " SSL Session."; 337 338 if (acceptor_->spdy_only_ && !spdy_negotiated) { 339 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 340 << "SPDY proxy only, closing HTTPS connection."; 341 return false; 342 } 343 344 switch (acceptor_->flip_handler_type_) { 345 case FLIP_HANDLER_HTTP_SERVER: 346 { 347 DCHECK(!spdy_negotiated); 348 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 349 << (sm_http_interface_ ? "Creating" : "Reusing") 350 << " HTTP interface."; 351 if (!sm_http_interface_) 352 sm_http_interface_ = new HttpSM(this, 353 NULL, 354 epoll_server_, 355 memory_cache_, 356 acceptor_); 357 sm_interface_ = sm_http_interface_; 358 } 359 break; 360 case FLIP_HANDLER_PROXY: 361 { 362 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 363 << (sm_streamer_interface_ ? "Creating" : "Reusing") 364 << " PROXY Streamer interface."; 365 if (!sm_streamer_interface_) { 366 sm_streamer_interface_ = new StreamerSM(this, 367 NULL, 368 epoll_server_, 369 acceptor_); 370 sm_streamer_interface_->set_is_request(); 371 } 372 sm_interface_ = sm_streamer_interface_; 373 // If spdy is not negotiated, the streamer interface will proxy all 374 // data to the origin server. 375 if (!spdy_negotiated) 376 break; 377 } 378 // Otherwise fall through into the case below. 379 case FLIP_HANDLER_SPDY_SERVER: 380 { 381 DCHECK(spdy_negotiated); 382 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 383 << (sm_spdy_interface_ ? "Creating" : "Reusing") 384 << " SPDY interface."; 385 if (!sm_spdy_interface_) 386 sm_spdy_interface_ = new SpdySM(this, 387 NULL, 388 epoll_server_, 389 memory_cache_, 390 acceptor_); 391 sm_interface_ = sm_spdy_interface_; 392 } 393 break; 394 } 395 396 CorkSocket(); 397 if (!sm_interface_->PostAcceptHook()) 398 return false; 399 400 return true; 401 } 402 403 bool SMConnection::DoRead() { 404 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()"; 405 while (!read_buffer_.Full()) { 406 char* bytes; 407 int size; 408 if (fd_ == -1) { 409 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 410 << "DoRead(): fd_ == -1. Invalid FD. Returning false"; 411 return false; 412 } 413 read_buffer_.GetWritablePtr(&bytes, &size); 414 ssize_t bytes_read = 0; 415 if (ssl_) { 416 bytes_read = SSL_read(ssl_, bytes, size); 417 if (bytes_read < 0) { 418 int err = SSL_get_error(ssl_, bytes_read); 419 switch (err) { 420 case SSL_ERROR_WANT_READ: 421 case SSL_ERROR_WANT_WRITE: 422 case SSL_ERROR_WANT_ACCEPT: 423 case SSL_ERROR_WANT_CONNECT: 424 events_ &= ~EPOLLIN; 425 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 426 << "DoRead: SSL WANT_XXX: " << err; 427 goto done; 428 default: 429 PrintSslError(); 430 goto error_or_close; 431 } 432 } 433 } else { 434 bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT); 435 } 436 int stored_errno = errno; 437 if (bytes_read == -1) { 438 switch (stored_errno) { 439 case EAGAIN: 440 events_ &= ~EPOLLIN; 441 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 442 << "Got EAGAIN while reading"; 443 goto done; 444 case EINTR: 445 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 446 << "Got EINTR while reading"; 447 continue; 448 default: 449 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 450 << "While calling recv, got error: " 451 << (ssl_?"(ssl error)":strerror(stored_errno)); 452 goto error_or_close; 453 } 454 } else if (bytes_read > 0) { 455 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read 456 << " bytes"; 457 last_read_time_ = time(NULL); 458 // If the protocol hasn't been detected yet, set up the handlers 459 // we'll need. 460 if (!protocol_detected_) { 461 if (!SetupProtocolInterfaces()) { 462 LOG(ERROR) << "Error setting up protocol interfaces."; 463 goto error_or_close; 464 } 465 } 466 read_buffer_.AdvanceWritablePtr(bytes_read); 467 if (!DoConsumeReadData()) 468 goto error_or_close; 469 continue; 470 } else { // bytes_read == 0 471 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 472 << "0 bytes read with recv call."; 473 } 474 goto error_or_close; 475 } 476 done: 477 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!"; 478 return true; 479 480 error_or_close: 481 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 482 << "DoRead(): error_or_close. " 483 << "Cleaning up, then returning false"; 484 Cleanup("DoRead"); 485 return false; 486 } 487 488 bool SMConnection::DoConsumeReadData() { 489 char* bytes; 490 int size; 491 read_buffer_.GetReadablePtr(&bytes, &size); 492 while (size != 0) { 493 size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size); 494 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed " 495 << bytes_consumed << " bytes"; 496 if (bytes_consumed == 0) { 497 break; 498 } 499 read_buffer_.AdvanceReadablePtr(bytes_consumed); 500 if (sm_interface_->MessageFullyRead()) { 501 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 502 << "HandleRequestFullyRead: Setting EPOLLOUT"; 503 HandleResponseFullyRead(); 504 events_ |= EPOLLOUT; 505 } else if (sm_interface_->Error()) { 506 LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 507 << "Framer error detected: Setting EPOLLOUT: " 508 << sm_interface_->ErrorAsString(); 509 // this causes everything to be closed/cleaned up. 510 events_ |= EPOLLOUT; 511 return false; 512 } 513 read_buffer_.GetReadablePtr(&bytes, &size); 514 } 515 return true; 516 } 517 518 void SMConnection::HandleResponseFullyRead() { 519 sm_interface_->Cleanup(); 520 } 521 522 bool SMConnection::DoWrite() { 523 size_t bytes_sent = 0; 524 int flags = MSG_NOSIGNAL | MSG_DONTWAIT; 525 if (fd_ == -1) { 526 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 527 << "DoWrite: fd == -1. Returning false."; 528 return false; 529 } 530 if (output_list_.empty()) { 531 VLOG(2) << log_prefix_ << "DoWrite: Output list empty."; 532 if (sm_interface_) { 533 sm_interface_->GetOutput(); 534 } 535 if (output_list_.empty()) { 536 events_ &= ~EPOLLOUT; 537 } 538 } 539 while (!output_list_.empty()) { 540 VLOG(2) << log_prefix_ << "DoWrite: Items in output list: " 541 << output_list_.size(); 542 if (bytes_sent >= max_bytes_sent_per_dowrite_) { 543 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 544 << " byte sent >= max bytes sent per write: Setting EPOLLOUT: " 545 << bytes_sent; 546 events_ |= EPOLLOUT; 547 break; 548 } 549 if (sm_interface_ && output_list_.size() < 2) { 550 sm_interface_->GetOutput(); 551 } 552 DataFrame* data_frame = output_list_.front(); 553 const char* bytes = data_frame->data; 554 int size = data_frame->size; 555 bytes += data_frame->index; 556 size -= data_frame->index; 557 DCHECK_GE(size, 0); 558 if (size <= 0) { 559 output_list_.pop_front(); 560 delete data_frame; 561 continue; 562 } 563 564 flags = MSG_NOSIGNAL | MSG_DONTWAIT; 565 // Look for a queue size > 1 because |this| frame is remains on the list 566 // until it has finished sending. 567 if (output_list_.size() > 1) { 568 VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() 569 << ": Adding MSG_MORE flag"; 570 flags |= MSG_MORE; 571 } 572 VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; 573 ssize_t bytes_written = Send(bytes, size, flags); 574 int stored_errno = errno; 575 if (bytes_written == -1) { 576 switch (stored_errno) { 577 case EAGAIN: 578 events_ &= ~EPOLLOUT; 579 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 580 << "Got EAGAIN while writing"; 581 goto done; 582 case EINTR: 583 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 584 << "Got EINTR while writing"; 585 continue; 586 default: 587 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 588 << "While calling send, got error: " << stored_errno 589 << ": " << (ssl_?"":strerror(stored_errno)); 590 goto error_or_close; 591 } 592 } else if (bytes_written > 0) { 593 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " 594 << bytes_written << " bytes"; 595 data_frame->index += bytes_written; 596 bytes_sent += bytes_written; 597 continue; 598 } else if (bytes_written == -2) { 599 // -2 handles SSL_ERROR_WANT_* errors 600 events_ &= ~EPOLLOUT; 601 goto done; 602 } 603 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 604 << "0 bytes written with send call."; 605 goto error_or_close; 606 } 607 done: 608 UncorkSocket(); 609 return true; 610 611 error_or_close: 612 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT 613 << "DoWrite: error_or_close. Returning false " 614 << "after cleaning up"; 615 Cleanup("DoWrite"); 616 UncorkSocket(); 617 return false; 618 } 619 620 void SMConnection::Reset() { 621 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; 622 if (ssl_) { 623 SSL_shutdown(ssl_); 624 PrintSslError(); 625 SSL_free(ssl_); 626 PrintSslError(); 627 ssl_ = NULL; 628 } 629 if (registered_in_epoll_server_) { 630 epoll_server_->UnregisterFD(fd_); 631 registered_in_epoll_server_ = false; 632 } 633 if (fd_ >= 0) { 634 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; 635 close(fd_); 636 fd_ = -1; 637 } 638 read_buffer_.Clear(); 639 initialized_ = false; 640 protocol_detected_ = false; 641 events_ = 0; 642 for (std::list<DataFrame*>::iterator i = 643 output_list_.begin(); 644 i != output_list_.end(); 645 ++i) { 646 delete *i; 647 } 648 output_list_.clear(); 649 } 650 651 // static 652 SMConnection* SMConnection::NewSMConnection(EpollServer* epoll_server, 653 SSLState *ssl_state, 654 MemoryCache* memory_cache, 655 FlipAcceptor *acceptor, 656 std::string log_prefix) { 657 return new SMConnection(epoll_server, ssl_state, memory_cache, 658 acceptor, log_prefix); 659 } 660 661 } // namespace net 662 663 664