1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/port.h" 29 30 #include <algorithm> 31 #include <vector> 32 33 #include "talk/base/helpers.h" 34 #include "talk/base/logging.h" 35 #include "talk/base/scoped_ptr.h" 36 #include "talk/base/stringutils.h" 37 #include "talk/p2p/base/common.h" 38 39 namespace { 40 41 // The length of time we wait before timing out readability on a connection. 42 const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds 43 44 // The length of time we wait before timing out writability on a connection. 45 const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds 46 47 // The length of time we wait before we become unwritable. 48 const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000; // 5 seconds 49 50 // The number of pings that must fail to respond before we become unwritable. 51 const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5; 52 53 // This is the length of time that we wait for a ping response to come back. 54 const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000; // 5 seconds 55 56 // Determines whether we have seen at least the given maximum number of 57 // pings fail to have a response. 58 inline bool TooManyFailures( 59 const std::vector<uint32>& pings_since_last_response, 60 uint32 maximum_failures, 61 uint32 rtt_estimate, 62 uint32 now) { 63 64 // If we haven't sent that many pings, then we can't have failed that many. 65 if (pings_since_last_response.size() < maximum_failures) 66 return false; 67 68 // Check if the window in which we would expect a response to the ping has 69 // already elapsed. 70 return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now; 71 } 72 73 // Determines whether we have gone too long without seeing any response. 74 inline bool TooLongWithoutResponse( 75 const std::vector<uint32>& pings_since_last_response, 76 uint32 maximum_time, 77 uint32 now) { 78 79 if (pings_since_last_response.size() == 0) 80 return false; 81 82 return pings_since_last_response[0] + maximum_time < now; 83 } 84 85 // We will restrict RTT estimates (when used for determining state) to be 86 // within a reasonable range. 87 const uint32 MINIMUM_RTT = 100; // 0.1 seconds 88 const uint32 MAXIMUM_RTT = 3000; // 3 seconds 89 90 // When we don't have any RTT data, we have to pick something reasonable. We 91 // use a large value just in case the connection is really slow. 92 const uint32 DEFAULT_RTT = MAXIMUM_RTT; 93 94 // Computes our estimate of the RTT given the current estimate. 95 inline uint32 ConservativeRTTEstimate(uint32 rtt) { 96 return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt)); 97 } 98 99 // Weighting of the old rtt value to new data. 100 const int RTT_RATIO = 3; // 3 : 1 101 102 // The delay before we begin checking if this port is useless. 103 const int kPortTimeoutDelay = 30 * 1000; // 30 seconds 104 105 const uint32 MSG_CHECKTIMEOUT = 1; 106 const uint32 MSG_DELETE = 1; 107 } 108 109 namespace cricket { 110 111 static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" }; 112 113 const char* ProtoToString(ProtocolType proto) { 114 return PROTO_NAMES[proto]; 115 } 116 117 bool StringToProto(const char* value, ProtocolType* proto) { 118 for (size_t i = 0; i <= PROTO_LAST; ++i) { 119 if (strcmp(PROTO_NAMES[i], value) == 0) { 120 *proto = static_cast<ProtocolType>(i); 121 return true; 122 } 123 } 124 return false; 125 } 126 127 Port::Port(talk_base::Thread* thread, const std::string& type, 128 talk_base::PacketSocketFactory* factory, talk_base::Network* network, 129 uint32 ip, int min_port, int max_port) 130 : thread_(thread), 131 factory_(factory), 132 type_(type), 133 network_(network), 134 ip_(ip), 135 min_port_(min_port), 136 max_port_(max_port), 137 preference_(-1), 138 lifetime_(LT_PRESTART), 139 enable_port_packets_(false) { 140 ASSERT(factory_ != NULL); 141 142 set_username_fragment(talk_base::CreateRandomString(16)); 143 set_password(talk_base::CreateRandomString(16)); 144 LOG_J(LS_INFO, this) << "Port created"; 145 } 146 147 Port::~Port() { 148 // Delete all of the remaining connections. We copy the list up front 149 // because each deletion will cause it to be modified. 150 151 std::vector<Connection*> list; 152 153 AddressMap::iterator iter = connections_.begin(); 154 while (iter != connections_.end()) { 155 list.push_back(iter->second); 156 ++iter; 157 } 158 159 for (uint32 i = 0; i < list.size(); i++) 160 delete list[i]; 161 } 162 163 Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) { 164 AddressMap::const_iterator iter = connections_.find(remote_addr); 165 if (iter != connections_.end()) 166 return iter->second; 167 else 168 return NULL; 169 } 170 171 void Port::AddAddress(const talk_base::SocketAddress& address, 172 const std::string& protocol, 173 bool final) { 174 Candidate c; 175 c.set_name(name_); 176 c.set_type(type_); 177 c.set_protocol(protocol); 178 c.set_address(address); 179 c.set_preference(preference_); 180 c.set_username(username_frag_); 181 c.set_password(password_); 182 c.set_network_name(network_->name()); 183 c.set_generation(generation_); 184 candidates_.push_back(c); 185 186 if (final) 187 SignalAddressReady(this); 188 } 189 190 void Port::AddConnection(Connection* conn) { 191 connections_[conn->remote_candidate().address()] = conn; 192 conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed); 193 SignalConnectionCreated(this, conn); 194 } 195 196 void Port::OnReadPacket( 197 const char* data, size_t size, const talk_base::SocketAddress& addr) { 198 // If the user has enabled port packets, just hand this over. 199 if (enable_port_packets_) { 200 SignalReadPacket(this, data, size, addr); 201 return; 202 } 203 204 // If this is an authenticated STUN request, then signal unknown address and 205 // send back a proper binding response. 206 StunMessage* msg; 207 std::string remote_username; 208 if (!GetStunMessage(data, size, addr, &msg, &remote_username)) { 209 LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address (" 210 << addr.ToString() << ")"; 211 } else if (!msg) { 212 // STUN message handled already 213 } else if (msg->type() == STUN_BINDING_REQUEST) { 214 SignalUnknownAddress(this, addr, msg, remote_username); 215 } else { 216 // NOTE(tschmelcher): This is benign. It occurs if we pruned a 217 // connection for this port while it had STUN requests in flight, because 218 // we then get back responses for them, which this code correctly does not 219 // handle. 220 LOG_J(LS_ERROR, this) << "Received unexpected STUN message type (" 221 << msg->type() << ") from unknown address (" 222 << addr.ToString() << ")"; 223 delete msg; 224 } 225 } 226 227 bool Port::GetStunMessage(const char* data, size_t size, 228 const talk_base::SocketAddress& addr, 229 StunMessage** out_msg, std::string* out_username) { 230 // NOTE: This could clearly be optimized to avoid allocating any memory. 231 // However, at the data rates we'll be looking at on the client side, 232 // this probably isn't worth worrying about. 233 ASSERT(out_msg != NULL); 234 ASSERT(out_username != NULL); 235 *out_msg = NULL; 236 out_username->clear(); 237 238 // Parse the request message. If the packet is not a complete and correct 239 // STUN message, then ignore it. 240 talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage()); 241 talk_base::ByteBuffer buf(data, size); 242 if (!stun_msg->Read(&buf) || (buf.Length() > 0)) { 243 return false; 244 } 245 246 // The packet must include a username that either begins or ends with our 247 // fragment. It should begin with our fragment if it is a request and it 248 // should end with our fragment if it is a response. 249 const StunByteStringAttribute* username_attr = 250 stun_msg->GetByteString(STUN_ATTR_USERNAME); 251 252 int remote_frag_len = (username_attr ? username_attr->length() : 0); 253 remote_frag_len -= static_cast<int>(username_frag_.size()); 254 255 if (stun_msg->type() == STUN_BINDING_REQUEST) { 256 if (remote_frag_len < 0) { 257 // Username not present or corrupted, don't reply. 258 LOG_J(LS_ERROR, this) << "Received STUN request without username from " 259 << addr.ToString(); 260 return true; 261 } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(), 262 username_frag_.size()) != 0) { 263 LOG_J(LS_ERROR, this) << "Received STUN request with bad local username " 264 << std::string(username_attr->bytes(), 265 username_attr->length()) << " from " 266 << addr.ToString(); 267 SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST, 268 STUN_ERROR_REASON_BAD_REQUEST); 269 return true; 270 } 271 272 out_username->assign(username_attr->bytes() + username_frag_.size(), 273 username_attr->bytes() + username_attr->length()); 274 } else if ((stun_msg->type() == STUN_BINDING_RESPONSE) 275 || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) { 276 if (remote_frag_len < 0) { 277 LOG_J(LS_ERROR, this) << "Received STUN response without username from " 278 << addr.ToString(); 279 // Do not send error response to a response 280 return true; 281 } else if (std::memcmp(username_attr->bytes() + remote_frag_len, 282 username_frag_.c_str(), 283 username_frag_.size()) != 0) { 284 LOG_J(LS_ERROR, this) << "Received STUN response with bad local username " 285 << std::string(username_attr->bytes(), 286 username_attr->length()) << " from " 287 << addr.ToString(); 288 // Do not send error response to a response 289 return true; 290 } 291 292 out_username->assign(username_attr->bytes(), 293 username_attr->bytes() + remote_frag_len); 294 295 if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) { 296 if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) { 297 LOG_J(LS_ERROR, this) << "Received STUN binding error:" 298 << " class=" 299 << static_cast<int>(error_code->error_class()) 300 << " number=" 301 << static_cast<int>(error_code->number()) 302 << " reason='" << error_code->reason() << "'" 303 << " from " << addr.ToString(); 304 // Return message to allow error-specific processing 305 } else { 306 LOG_J(LS_ERROR, this) << "Received STUN binding error without a error " 307 << "code from " << addr.ToString(); 308 // Drop corrupt message 309 return true; 310 } 311 } 312 } else { 313 LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type (" 314 << stun_msg->type() << ") from " << addr.ToString(); 315 return true; 316 } 317 318 // Return the STUN message found. 319 *out_msg = stun_msg.release(); 320 return true; 321 } 322 323 void Port::SendBindingResponse(StunMessage* request, 324 const talk_base::SocketAddress& addr) { 325 ASSERT(request->type() == STUN_BINDING_REQUEST); 326 327 // Retrieve the username from the request. 328 const StunByteStringAttribute* username_attr = 329 request->GetByteString(STUN_ATTR_USERNAME); 330 ASSERT(username_attr != NULL); 331 if (username_attr == NULL) { 332 // No valid username, skip the response. 333 return; 334 } 335 336 // Fill in the response message. 337 StunMessage response; 338 response.SetType(STUN_BINDING_RESPONSE); 339 response.SetTransactionID(request->transaction_id()); 340 341 StunByteStringAttribute* username2_attr = 342 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 343 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); 344 response.AddAttribute(username2_attr); 345 346 StunAddressAttribute* addr_attr = 347 StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); 348 addr_attr->SetFamily(1); 349 addr_attr->SetPort(addr.port()); 350 addr_attr->SetIP(addr.ip()); 351 response.AddAttribute(addr_attr); 352 353 // Send the response message. 354 // NOTE: If we wanted to, this is where we would add the HMAC. 355 talk_base::ByteBuffer buf; 356 response.Write(&buf); 357 if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) { 358 LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to " 359 << addr.ToString(); 360 } 361 362 // The fact that we received a successful request means that this connection 363 // (if one exists) should now be readable. 364 Connection* conn = GetConnection(addr); 365 ASSERT(conn != NULL); 366 if (conn) 367 conn->ReceivedPing(); 368 } 369 370 void Port::SendBindingErrorResponse(StunMessage* request, 371 const talk_base::SocketAddress& addr, 372 int error_code, const std::string& reason) { 373 ASSERT(request->type() == STUN_BINDING_REQUEST); 374 375 // Retrieve the username from the request. If it didn't have one, we 376 // shouldn't be responding at all. 377 const StunByteStringAttribute* username_attr = 378 request->GetByteString(STUN_ATTR_USERNAME); 379 ASSERT(username_attr != NULL); 380 if (username_attr == NULL) { 381 // No valid username, skip the response. 382 return; 383 } 384 385 // Fill in the response message. 386 StunMessage response; 387 response.SetType(STUN_BINDING_ERROR_RESPONSE); 388 response.SetTransactionID(request->transaction_id()); 389 390 StunByteStringAttribute* username2_attr = 391 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 392 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); 393 response.AddAttribute(username2_attr); 394 395 StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode(); 396 error_attr->SetErrorCode(error_code); 397 error_attr->SetReason(reason); 398 response.AddAttribute(error_attr); 399 400 // Send the response message. 401 // NOTE: If we wanted to, this is where we would add the HMAC. 402 talk_base::ByteBuffer buf; 403 response.Write(&buf); 404 SendTo(buf.Data(), buf.Length(), addr, false); 405 LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason 406 << " to " << addr.ToString(); 407 } 408 409 void Port::OnMessage(talk_base::Message *pmsg) { 410 ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT); 411 ASSERT(lifetime_ == LT_PRETIMEOUT); 412 lifetime_ = LT_POSTTIMEOUT; 413 CheckTimeout(); 414 } 415 416 std::string Port::ToString() const { 417 std::stringstream ss; 418 ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]"; 419 return ss.str(); 420 } 421 422 void Port::EnablePortPackets() { 423 enable_port_packets_ = true; 424 } 425 426 void Port::Start() { 427 // The port sticks around for a minimum lifetime, after which 428 // we destroy it when it drops to zero connections. 429 if (lifetime_ == LT_PRESTART) { 430 lifetime_ = LT_PRETIMEOUT; 431 thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT); 432 } else { 433 LOG_J(LS_WARNING, this) << "Port restart attempted"; 434 } 435 } 436 437 void Port::OnConnectionDestroyed(Connection* conn) { 438 AddressMap::iterator iter = 439 connections_.find(conn->remote_candidate().address()); 440 ASSERT(iter != connections_.end()); 441 connections_.erase(iter); 442 443 CheckTimeout(); 444 } 445 446 void Port::Destroy() { 447 ASSERT(connections_.empty()); 448 LOG_J(LS_INFO, this) << "Port deleted"; 449 SignalDestroyed(this); 450 delete this; 451 } 452 453 void Port::CheckTimeout() { 454 // If this port has no connections, then there's no reason to keep it around. 455 // When the connections time out (both read and write), they will delete 456 // themselves, so if we have any connections, they are either readable or 457 // writable (or still connecting). 458 if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) { 459 Destroy(); 460 } 461 } 462 463 // A ConnectionRequest is a simple STUN ping used to determine writability. 464 class ConnectionRequest : public StunRequest { 465 public: 466 explicit ConnectionRequest(Connection* connection) : connection_(connection) { 467 } 468 469 virtual ~ConnectionRequest() { 470 } 471 472 virtual void Prepare(StunMessage* request) { 473 request->SetType(STUN_BINDING_REQUEST); 474 StunByteStringAttribute* username_attr = 475 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 476 std::string username = connection_->remote_candidate().username(); 477 username.append(connection_->port()->username_fragment()); 478 username_attr->CopyBytes(username.c_str(), username.size()); 479 request->AddAttribute(username_attr); 480 } 481 482 virtual void OnResponse(StunMessage* response) { 483 connection_->OnConnectionRequestResponse(this, response); 484 } 485 486 virtual void OnErrorResponse(StunMessage* response) { 487 connection_->OnConnectionRequestErrorResponse(this, response); 488 } 489 490 virtual void OnTimeout() { 491 connection_->OnConnectionRequestTimeout(this); 492 } 493 494 virtual int GetNextDelay() { 495 // Each request is sent only once. After a single delay , the request will 496 // time out. 497 timeout_ = true; 498 return CONNECTION_RESPONSE_TIMEOUT; 499 } 500 501 private: 502 Connection* connection_; 503 }; 504 505 // 506 // Connection 507 // 508 509 Connection::Connection(Port* port, size_t index, 510 const Candidate& remote_candidate) 511 : port_(port), local_candidate_index_(index), 512 remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT), 513 write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false), 514 requests_(port->thread()), rtt_(DEFAULT_RTT), 515 last_ping_sent_(0), last_ping_received_(0), last_data_received_(0), 516 reported_(false) { 517 // Wire up to send stun packets 518 requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket); 519 LOG_J(LS_INFO, this) << "Connection created"; 520 } 521 522 Connection::~Connection() { 523 } 524 525 const Candidate& Connection::local_candidate() const { 526 if (local_candidate_index_ < port_->candidates().size()) 527 return port_->candidates()[local_candidate_index_]; 528 ASSERT(false); 529 static Candidate foo; 530 return foo; 531 } 532 533 void Connection::set_read_state(ReadState value) { 534 ReadState old_value = read_state_; 535 read_state_ = value; 536 if (value != old_value) { 537 LOG_J(LS_VERBOSE, this) << "set_read_state"; 538 SignalStateChange(this); 539 CheckTimeout(); 540 } 541 } 542 543 void Connection::set_write_state(WriteState value) { 544 WriteState old_value = write_state_; 545 write_state_ = value; 546 if (value != old_value) { 547 LOG_J(LS_VERBOSE, this) << "set_write_state"; 548 SignalStateChange(this); 549 CheckTimeout(); 550 } 551 } 552 553 void Connection::set_connected(bool value) { 554 bool old_value = connected_; 555 connected_ = value; 556 if (value != old_value) { 557 LOG_J(LS_VERBOSE, this) << "set_connected"; 558 } 559 } 560 561 void Connection::OnSendStunPacket(const void* data, size_t size, 562 StunRequest* req) { 563 if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) { 564 LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id(); 565 } 566 } 567 568 void Connection::OnReadPacket(const char* data, size_t size) { 569 StunMessage* msg; 570 std::string remote_username; 571 const talk_base::SocketAddress& addr(remote_candidate_.address()); 572 if (!port_->GetStunMessage(data, size, addr, &msg, &remote_username)) { 573 // The packet did not parse as a valid STUN message 574 575 // If this connection is readable, then pass along the packet. 576 if (read_state_ == STATE_READABLE) { 577 // readable means data from this address is acceptable 578 // Send it on! 579 580 last_data_received_ = talk_base::Time(); 581 recv_rate_tracker_.Update(size); 582 SignalReadPacket(this, data, size); 583 584 // If timed out sending writability checks, start up again 585 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) 586 set_write_state(STATE_WRITE_CONNECT); 587 } else { 588 // Not readable means the remote address hasn't sent a valid 589 // binding request yet. 590 591 LOG_J(LS_WARNING, this) 592 << "Received non-STUN packet from an unreadable connection."; 593 } 594 } else if (!msg) { 595 // The packet was STUN, but was already handled internally. 596 } else if (remote_username != remote_candidate_.username()) { 597 // The packet had the right local username, but the remote username was 598 // not the right one for the remote address. 599 if (msg->type() == STUN_BINDING_REQUEST) { 600 LOG_J(LS_ERROR, this) << "Received STUN request with bad remote username " 601 << remote_username; 602 port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST, 603 STUN_ERROR_REASON_BAD_REQUEST); 604 } else if (msg->type() == STUN_BINDING_RESPONSE || 605 msg->type() == STUN_BINDING_ERROR_RESPONSE) { 606 LOG_J(LS_ERROR, this) << "Received STUN response with bad remote username" 607 " " << remote_username; 608 } 609 delete msg; 610 } else { 611 // The packet is STUN, with the right username. 612 // If this is a STUN request, then update the readable bit and respond. 613 // If this is a STUN response, then update the writable bit. 614 615 switch (msg->type()) { 616 case STUN_BINDING_REQUEST: 617 // Incoming, validated stun request from remote peer. 618 // This call will also set the connection readable. 619 620 port_->SendBindingResponse(msg, addr); 621 622 // If timed out sending writability checks, start up again 623 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) 624 set_write_state(STATE_WRITE_CONNECT); 625 break; 626 627 case STUN_BINDING_RESPONSE: 628 case STUN_BINDING_ERROR_RESPONSE: 629 // Response from remote peer. Does it match request sent? 630 // This doesn't just check, it makes callbacks if transaction 631 // id's match 632 requests_.CheckResponse(msg); 633 break; 634 635 default: 636 ASSERT(false); 637 break; 638 } 639 640 // Done with the message; delete 641 642 delete msg; 643 } 644 } 645 646 void Connection::Prune() { 647 if (!pruned_) { 648 LOG_J(LS_VERBOSE, this) << "Connection pruned"; 649 pruned_ = true; 650 requests_.Clear(); 651 set_write_state(STATE_WRITE_TIMEOUT); 652 } 653 } 654 655 void Connection::Destroy() { 656 LOG_J(LS_VERBOSE, this) << "Connection destroyed"; 657 set_read_state(STATE_READ_TIMEOUT); 658 set_write_state(STATE_WRITE_TIMEOUT); 659 } 660 661 void Connection::UpdateState(uint32 now) { 662 uint32 rtt = ConservativeRTTEstimate(rtt_); 663 664 std::string pings; 665 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) { 666 char buf[32]; 667 talk_base::sprintfn(buf, sizeof(buf), "%u", 668 pings_since_last_response_[i]); 669 pings.append(buf).append(" "); 670 } 671 LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_=" << 672 pings << ", rtt=" << rtt << ", now=" << now; 673 674 // Check the readable state. 675 // 676 // Since we don't know how many pings the other side has attempted, the best 677 // test we can do is a simple window. 678 679 if ((read_state_ == STATE_READABLE) && 680 (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) { 681 LOG_J(LS_INFO, this) << "Unreadable after " 682 << now - last_ping_received_ 683 << " ms without a ping, rtt=" << rtt; 684 set_read_state(STATE_READ_TIMEOUT); 685 } 686 687 // Check the writable state. (The order of these checks is important.) 688 // 689 // Before becoming unwritable, we allow for a fixed number of pings to fail 690 // (i.e., receive no response). We also have to give the response time to 691 // get back, so we include a conservative estimate of this. 692 // 693 // Before timing out writability, we give a fixed amount of time. This is to 694 // allow for changes in network conditions. 695 696 if ((write_state_ == STATE_WRITABLE) && 697 TooManyFailures(pings_since_last_response_, 698 CONNECTION_WRITE_CONNECT_FAILURES, 699 rtt, 700 now) && 701 TooLongWithoutResponse(pings_since_last_response_, 702 CONNECTION_WRITE_CONNECT_TIMEOUT, 703 now)) { 704 uint32 max_pings = CONNECTION_WRITE_CONNECT_FAILURES; 705 LOG_J(LS_INFO, this) << "Unwritable after " << max_pings 706 << " ping failures and " 707 << now - pings_since_last_response_[0] 708 << " ms without a response," 709 << " ms since last received ping=" 710 << now - last_ping_received_ 711 << " ms since last received data=" 712 << now - last_data_received_ 713 << " rtt=" << rtt; 714 set_write_state(STATE_WRITE_CONNECT); 715 } 716 717 if ((write_state_ == STATE_WRITE_CONNECT) && 718 TooLongWithoutResponse(pings_since_last_response_, 719 CONNECTION_WRITE_TIMEOUT, 720 now)) { 721 LOG_J(LS_INFO, this) << "Timed out after " 722 << now - pings_since_last_response_[0] 723 << " ms without a response, rtt=" << rtt; 724 set_write_state(STATE_WRITE_TIMEOUT); 725 } 726 } 727 728 void Connection::Ping(uint32 now) { 729 ASSERT(connected_); 730 last_ping_sent_ = now; 731 pings_since_last_response_.push_back(now); 732 ConnectionRequest *req = new ConnectionRequest(this); 733 LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now; 734 requests_.Send(req); 735 } 736 737 void Connection::ReceivedPing() { 738 last_ping_received_ = talk_base::Time(); 739 set_read_state(STATE_READABLE); 740 } 741 742 std::string Connection::ToString() const { 743 const char CONNECT_STATE_ABBREV[2] = { 744 '-', // not connected (false) 745 'C', // connected (true) 746 }; 747 const char READ_STATE_ABBREV[2] = { 748 'R', // STATE_READABLE 749 '-', // STATE_READ_TIMEOUT 750 }; 751 const char WRITE_STATE_ABBREV[3] = { 752 'W', // STATE_WRITABLE 753 'w', // STATE_WRITE_CONNECT 754 '-', // STATE_WRITE_TIMEOUT 755 }; 756 const Candidate& local = local_candidate(); 757 const Candidate& remote = remote_candidate(); 758 std::stringstream ss; 759 ss << "Conn[" << local.generation() 760 << ":" << local.name() << ":" << local.type() << ":" 761 << local.protocol() << ":" << local.address().ToString() 762 << "->" << remote.name() << ":" << remote.type() << ":" 763 << remote.protocol() << ":" << remote.address().ToString() 764 << "|" 765 << CONNECT_STATE_ABBREV[connected()] 766 << READ_STATE_ABBREV[read_state()] 767 << WRITE_STATE_ABBREV[write_state()] 768 << "|"; 769 if (rtt_ < DEFAULT_RTT) { 770 ss << rtt_ << "]"; 771 } else { 772 ss << "-]"; 773 } 774 return ss.str(); 775 } 776 777 void Connection::OnConnectionRequestResponse(ConnectionRequest* request, 778 StunMessage* response) { 779 // We've already validated that this is a STUN binding response with 780 // the correct local and remote username for this connection. 781 // So if we're not already, become writable. We may be bringing a pruned 782 // connection back to life, but if we don't really want it, we can always 783 // prune it again. 784 uint32 rtt = request->Elapsed(); 785 set_write_state(STATE_WRITABLE); 786 787 std::string pings; 788 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) { 789 char buf[32]; 790 talk_base::sprintfn(buf, sizeof(buf), "%u", 791 pings_since_last_response_[i]); 792 pings.append(buf).append(" "); 793 } 794 795 LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << request->id() 796 << ", pings_since_last_response_=" << pings 797 << ", rtt=" << rtt; 798 799 pings_since_last_response_.clear(); 800 rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1); 801 } 802 803 void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request, 804 StunMessage* response) { 805 const StunErrorCodeAttribute* error = response->GetErrorCode(); 806 uint32 error_code = error ? 807 error->error_code() : static_cast<uint32>(STUN_ERROR_GLOBAL_FAILURE); 808 809 if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE) 810 || (error_code == STUN_ERROR_SERVER_ERROR) 811 || (error_code == STUN_ERROR_UNAUTHORIZED)) { 812 // Recoverable error, retry 813 } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) { 814 // Race failure, retry 815 } else { 816 // This is not a valid connection. 817 LOG_J(LS_ERROR, this) << "Received STUN error response, code=" 818 << error_code << "; killing connection"; 819 set_write_state(STATE_WRITE_TIMEOUT); 820 } 821 } 822 823 void Connection::OnConnectionRequestTimeout(ConnectionRequest* request) { 824 // Log at LS_INFO if we miss a ping on a writable connection. 825 talk_base::LoggingSeverity sev = (write_state_ == STATE_WRITABLE) ? 826 talk_base::LS_INFO : talk_base::LS_VERBOSE; 827 uint32 when = talk_base::Time() - request->Elapsed(); 828 size_t failures; 829 for (failures = 0; failures < pings_since_last_response_.size(); ++failures) { 830 if (pings_since_last_response_[failures] > when) { 831 break; 832 } 833 } 834 LOG_JV(sev, this) << "Timing-out STUN ping " << request->id() 835 << " after " << request->Elapsed() 836 << " ms, failures=" << failures; 837 } 838 839 void Connection::CheckTimeout() { 840 // If both read and write have timed out, then this connection can contribute 841 // no more to p2p socket unless at some later date readability were to come 842 // back. However, we gave readability a long time to timeout, so at this 843 // point, it seems fair to get rid of this connection. 844 if ((read_state_ == STATE_READ_TIMEOUT) && 845 (write_state_ == STATE_WRITE_TIMEOUT)) { 846 port_->thread()->Post(this, MSG_DELETE); 847 } 848 } 849 850 void Connection::OnMessage(talk_base::Message *pmsg) { 851 ASSERT(pmsg->message_id == MSG_DELETE); 852 853 LOG_J(LS_INFO, this) << "Connection deleted"; 854 SignalDestroyed(this); 855 delete this; 856 } 857 858 size_t Connection::recv_bytes_second() { 859 return recv_rate_tracker_.units_second(); 860 } 861 862 size_t Connection::recv_total_bytes() { 863 return recv_rate_tracker_.total_units(); 864 } 865 866 size_t Connection::sent_bytes_second() { 867 return send_rate_tracker_.units_second(); 868 } 869 870 size_t Connection::sent_total_bytes() { 871 return send_rate_tracker_.total_units(); 872 } 873 874 ProxyConnection::ProxyConnection(Port* port, size_t index, 875 const Candidate& candidate) 876 : Connection(port, index, candidate), error_(0) { 877 } 878 879 int ProxyConnection::Send(const void* data, size_t size) { 880 if (write_state() != STATE_WRITABLE) { 881 error_ = EWOULDBLOCK; 882 return SOCKET_ERROR; 883 } 884 int sent = port_->SendTo(data, size, remote_candidate_.address(), true); 885 if (sent <= 0) { 886 ASSERT(sent < 0); 887 error_ = port_->GetError(); 888 } else { 889 send_rate_tracker_.Update(sent); 890 } 891 return sent; 892 } 893 894 } // namespace cricket 895