1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/p2ptransportchannel.h" 29 30 #include <set> 31 #include "talk/base/common.h" 32 #include "talk/base/crc32.h" 33 #include "talk/base/logging.h" 34 #include "talk/base/stringencode.h" 35 #include "talk/p2p/base/common.h" 36 #include "talk/p2p/base/relayport.h" // For RELAY_PORT_TYPE. 37 #include "talk/p2p/base/stunport.h" // For STUN_PORT_TYPE. 38 39 namespace { 40 41 // messages for queuing up work for ourselves 42 enum { 43 MSG_SORT = 1, 44 MSG_PING, 45 }; 46 47 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers) 48 // for pinging. When the socket is writable, we will use only 1 Kbps because 49 // we don't want to degrade the quality on a modem. These numbers should work 50 // well on a 28.8K modem, which is the slowest connection on which the voice 51 // quality is reasonable at all. 52 static const uint32 PING_PACKET_SIZE = 60 * 8; 53 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms 54 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms 55 56 // If there is a current writable connection, then we will also try hard to 57 // make sure it is pinged at this rate. 58 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit 59 60 // The minimum improvement in RTT that justifies a switch. 61 static const double kMinImprovement = 10; 62 63 cricket::PortInterface::CandidateOrigin GetOrigin(cricket::PortInterface* port, 64 cricket::PortInterface* origin_port) { 65 if (!origin_port) 66 return cricket::PortInterface::ORIGIN_MESSAGE; 67 else if (port == origin_port) 68 return cricket::PortInterface::ORIGIN_THIS_PORT; 69 else 70 return cricket::PortInterface::ORIGIN_OTHER_PORT; 71 } 72 73 // Compares two connections based only on static information about them. 74 int CompareConnectionCandidates(cricket::Connection* a, 75 cricket::Connection* b) { 76 // Compare connection priority. Lower values get sorted last. 77 if (a->priority() > b->priority()) 78 return 1; 79 if (a->priority() < b->priority()) 80 return -1; 81 82 // If we're still tied at this point, prefer a younger generation. 83 return (a->remote_candidate().generation() + a->port()->generation()) - 84 (b->remote_candidate().generation() + b->port()->generation()); 85 } 86 87 // Compare two connections based on their writability and static preferences. 88 int CompareConnections(cricket::Connection *a, cricket::Connection *b) { 89 // Sort based on write-state. Better states have lower values. 90 if (a->write_state() < b->write_state()) 91 return 1; 92 if (a->write_state() > b->write_state()) 93 return -1; 94 95 // Compare the candidate information. 96 return CompareConnectionCandidates(a, b); 97 } 98 99 // Wraps the comparison connection into a less than operator that puts higher 100 // priority writable connections first. 101 class ConnectionCompare { 102 public: 103 bool operator()(const cricket::Connection *ca, 104 const cricket::Connection *cb) { 105 cricket::Connection* a = const_cast<cricket::Connection*>(ca); 106 cricket::Connection* b = const_cast<cricket::Connection*>(cb); 107 108 ASSERT(a->port()->IceProtocol() == b->port()->IceProtocol()); 109 110 // Compare first on writability and static preferences. 111 int cmp = CompareConnections(a, b); 112 if (cmp > 0) 113 return true; 114 if (cmp < 0) 115 return false; 116 117 // Otherwise, sort based on latency estimate. 118 return a->rtt() < b->rtt(); 119 120 // Should we bother checking for the last connection that last received 121 // data? It would help rendezvous on the connection that is also receiving 122 // packets. 123 // 124 // TODO: Yes we should definitely do this. The TCP protocol gains 125 // efficiency by being used bidirectionally, as opposed to two separate 126 // unidirectional streams. This test should probably occur before 127 // comparison of local prefs (assuming combined prefs are the same). We 128 // need to be careful though, not to bounce back and forth with both sides 129 // trying to rendevous with the other. 130 } 131 }; 132 133 // Determines whether we should switch between two connections, based first on 134 // static preferences and then (if those are equal) on latency estimates. 135 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { 136 if (a_conn == b_conn) 137 return false; 138 139 if (!a_conn || !b_conn) // don't think the latter should happen 140 return true; 141 142 int prefs_cmp = CompareConnections(a_conn, b_conn); 143 if (prefs_cmp < 0) 144 return true; 145 if (prefs_cmp > 0) 146 return false; 147 148 return b_conn->rtt() <= a_conn->rtt() + kMinImprovement; 149 } 150 151 } // unnamed namespace 152 153 namespace cricket { 154 155 P2PTransportChannel::P2PTransportChannel(const std::string& content_name, 156 int component, 157 P2PTransport* transport, 158 PortAllocator *allocator) : 159 TransportChannelImpl(content_name, component), 160 transport_(transport), 161 allocator_(allocator), 162 worker_thread_(talk_base::Thread::Current()), 163 incoming_only_(false), 164 waiting_for_signaling_(false), 165 error_(0), 166 best_connection_(NULL), 167 pending_best_connection_(NULL), 168 sort_dirty_(false), 169 was_writable_(false), 170 protocol_type_(ICEPROTO_GOOGLE), 171 remote_ice_mode_(ICEMODE_FULL), 172 ice_role_(ICEROLE_UNKNOWN), 173 tiebreaker_(0), 174 remote_candidate_generation_(0) { 175 } 176 177 P2PTransportChannel::~P2PTransportChannel() { 178 ASSERT(worker_thread_ == talk_base::Thread::Current()); 179 180 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 181 delete allocator_sessions_[i]; 182 } 183 184 // Add the allocator session to our list so that we know which sessions 185 // are still active. 186 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { 187 session->set_generation(static_cast<uint32>(allocator_sessions_.size())); 188 allocator_sessions_.push_back(session); 189 190 // We now only want to apply new candidates that we receive to the ports 191 // created by this new session because these are replacing those of the 192 // previous sessions. 193 ports_.clear(); 194 195 session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); 196 session->SignalCandidatesReady.connect( 197 this, &P2PTransportChannel::OnCandidatesReady); 198 session->SignalCandidatesAllocationDone.connect( 199 this, &P2PTransportChannel::OnCandidatesAllocationDone); 200 session->StartGettingPorts(); 201 } 202 203 void P2PTransportChannel::AddConnection(Connection* connection) { 204 connections_.push_back(connection); 205 connection->set_remote_ice_mode(remote_ice_mode_); 206 connection->SignalReadPacket.connect( 207 this, &P2PTransportChannel::OnReadPacket); 208 connection->SignalReadyToSend.connect( 209 this, &P2PTransportChannel::OnReadyToSend); 210 connection->SignalStateChange.connect( 211 this, &P2PTransportChannel::OnConnectionStateChange); 212 connection->SignalDestroyed.connect( 213 this, &P2PTransportChannel::OnConnectionDestroyed); 214 connection->SignalUseCandidate.connect( 215 this, &P2PTransportChannel::OnUseCandidate); 216 } 217 218 void P2PTransportChannel::SetIceRole(IceRole ice_role) { 219 ASSERT(worker_thread_ == talk_base::Thread::Current()); 220 if (ice_role_ != ice_role) { 221 ice_role_ = ice_role; 222 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 223 it != ports_.end(); ++it) { 224 (*it)->SetIceRole(ice_role); 225 } 226 } 227 } 228 229 void P2PTransportChannel::SetIceTiebreaker(uint64 tiebreaker) { 230 ASSERT(worker_thread_ == talk_base::Thread::Current()); 231 if (!ports_.empty()) { 232 LOG(LS_ERROR) 233 << "Attempt to change tiebreaker after Port has been allocated."; 234 return; 235 } 236 237 tiebreaker_ = tiebreaker; 238 } 239 240 void P2PTransportChannel::SetIceProtocolType(IceProtocolType type) { 241 ASSERT(worker_thread_ == talk_base::Thread::Current()); 242 243 protocol_type_ = type; 244 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 245 it != ports_.end(); ++it) { 246 (*it)->SetIceProtocolType(protocol_type_); 247 } 248 } 249 250 void P2PTransportChannel::SetIceCredentials(const std::string& ice_ufrag, 251 const std::string& ice_pwd) { 252 ASSERT(worker_thread_ == talk_base::Thread::Current()); 253 bool ice_restart = false; 254 if (!ice_ufrag_.empty() && !ice_pwd_.empty()) { 255 // Restart candidate allocation if there is any change in either 256 // ice ufrag or password. 257 ice_restart = (ice_ufrag_ != ice_ufrag) || (ice_pwd_!= ice_pwd); 258 } 259 260 ice_ufrag_ = ice_ufrag; 261 ice_pwd_ = ice_pwd; 262 263 if (ice_restart) { 264 // Restart candidate gathering. 265 Allocate(); 266 } 267 } 268 269 void P2PTransportChannel::SetRemoteIceCredentials(const std::string& ice_ufrag, 270 const std::string& ice_pwd) { 271 ASSERT(worker_thread_ == talk_base::Thread::Current()); 272 bool ice_restart = false; 273 if (!remote_ice_ufrag_.empty() && !remote_ice_pwd_.empty()) { 274 ice_restart = (remote_ice_ufrag_ != ice_ufrag) || 275 (remote_ice_pwd_!= ice_pwd); 276 } 277 278 remote_ice_ufrag_ = ice_ufrag; 279 remote_ice_pwd_ = ice_pwd; 280 281 if (ice_restart) { 282 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 283 // Therefore we need to keep track of the remote ice restart so 284 // newer connections are prioritized over the older. 285 ++remote_candidate_generation_; 286 } 287 } 288 289 void P2PTransportChannel::SetRemoteIceMode(IceMode mode) { 290 remote_ice_mode_ = mode; 291 } 292 293 // Go into the state of processing candidates, and running in general 294 void P2PTransportChannel::Connect() { 295 ASSERT(worker_thread_ == talk_base::Thread::Current()); 296 if (ice_ufrag_.empty() || ice_pwd_.empty()) { 297 ASSERT(false); 298 LOG(LS_ERROR) << "P2PTransportChannel::Connect: The ice_ufrag_ and the " 299 << "ice_pwd_ are not set."; 300 return; 301 } 302 303 // Kick off an allocator session 304 Allocate(); 305 306 // Start pinging as the ports come in. 307 thread()->Post(this, MSG_PING); 308 } 309 310 // Reset the socket, clear up any previous allocations and start over 311 void P2PTransportChannel::Reset() { 312 ASSERT(worker_thread_ == talk_base::Thread::Current()); 313 314 // Get rid of all the old allocators. This should clean up everything. 315 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 316 delete allocator_sessions_[i]; 317 318 allocator_sessions_.clear(); 319 ports_.clear(); 320 connections_.clear(); 321 best_connection_ = NULL; 322 323 // Forget about all of the candidates we got before. 324 remote_candidates_.clear(); 325 326 // Revert to the initial state. 327 set_readable(false); 328 set_writable(false); 329 330 // Reinitialize the rest of our state. 331 waiting_for_signaling_ = false; 332 sort_dirty_ = false; 333 334 // If we allocated before, start a new one now. 335 if (transport_->connect_requested()) 336 Allocate(); 337 338 // Start pinging as the ports come in. 339 thread()->Clear(this); 340 thread()->Post(this, MSG_PING); 341 } 342 343 // A new port is available, attempt to make connections for it 344 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session, 345 PortInterface* port) { 346 ASSERT(worker_thread_ == talk_base::Thread::Current()); 347 348 // Set in-effect options on the new port 349 for (OptionMap::const_iterator it = options_.begin(); 350 it != options_.end(); 351 ++it) { 352 int val = port->SetOption(it->first, it->second); 353 if (val < 0) { 354 LOG_J(LS_WARNING, port) << "SetOption(" << it->first 355 << ", " << it->second 356 << ") failed: " << port->GetError(); 357 } 358 } 359 360 // Remember the ports and candidates, and signal that candidates are ready. 361 // The session will handle this, and send an initiate/accept/modify message 362 // if one is pending. 363 364 port->SetIceProtocolType(protocol_type_); 365 port->SetIceRole(ice_role_); 366 port->SetIceTiebreaker(tiebreaker_); 367 ports_.push_back(port); 368 port->SignalUnknownAddress.connect( 369 this, &P2PTransportChannel::OnUnknownAddress); 370 port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed); 371 port->SignalRoleConflict.connect( 372 this, &P2PTransportChannel::OnRoleConflict); 373 374 // Attempt to create a connection from this new port to all of the remote 375 // candidates that we were given so far. 376 377 std::vector<RemoteCandidate>::iterator iter; 378 for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); 379 ++iter) { 380 CreateConnection(port, *iter, iter->origin_port(), false); 381 } 382 383 SortConnections(); 384 } 385 386 // A new candidate is available, let listeners know 387 void P2PTransportChannel::OnCandidatesReady( 388 PortAllocatorSession *session, const std::vector<Candidate>& candidates) { 389 ASSERT(worker_thread_ == talk_base::Thread::Current()); 390 for (size_t i = 0; i < candidates.size(); ++i) { 391 SignalCandidateReady(this, candidates[i]); 392 } 393 } 394 395 void P2PTransportChannel::OnCandidatesAllocationDone( 396 PortAllocatorSession* session) { 397 ASSERT(worker_thread_ == talk_base::Thread::Current()); 398 SignalCandidatesAllocationDone(this); 399 } 400 401 // Handle stun packets 402 void P2PTransportChannel::OnUnknownAddress( 403 PortInterface* port, 404 const talk_base::SocketAddress& address, ProtocolType proto, 405 IceMessage* stun_msg, const std::string &remote_username, 406 bool port_muxed) { 407 ASSERT(worker_thread_ == talk_base::Thread::Current()); 408 409 // Port has received a valid stun packet from an address that no Connection 410 // is currently available for. See if we already have a candidate with the 411 // address. If it isn't we need to create new candidate for it. 412 413 // Determine if the remote candidates use shared ufrag. 414 bool ufrag_per_port = false; 415 std::vector<RemoteCandidate>::iterator it; 416 if (remote_candidates_.size() > 0) { 417 it = remote_candidates_.begin(); 418 std::string username = it->username(); 419 for (; it != remote_candidates_.end(); ++it) { 420 if (it->username() != username) { 421 ufrag_per_port = true; 422 break; 423 } 424 } 425 } 426 427 const Candidate* candidate = NULL; 428 bool known_username = false; 429 std::string remote_password; 430 for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { 431 if (it->username() == remote_username) { 432 remote_password = it->password(); 433 known_username = true; 434 if (ufrag_per_port || 435 (it->address() == address && 436 it->protocol() == ProtoToString(proto))) { 437 candidate = &(*it); 438 break; 439 } 440 // We don't want to break here because we may find a match of the address 441 // later. 442 } 443 } 444 445 if (!known_username) { 446 if (port_muxed) { 447 // When Ports are muxed, SignalUnknownAddress is delivered to all 448 // P2PTransportChannel belong to a session. Return from here will 449 // save us from sending stun binding error message from incorrect channel. 450 return; 451 } 452 // Don't know about this username, the request is bogus 453 // This sometimes happens if a binding response comes in before the ACCEPT 454 // message. It is totally valid; the retry state machine will try again. 455 port->SendBindingErrorResponse(stun_msg, address, 456 STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); 457 return; 458 } 459 460 Candidate new_remote_candidate; 461 if (candidate != NULL) { 462 new_remote_candidate = *candidate; 463 if (ufrag_per_port) { 464 new_remote_candidate.set_address(address); 465 } 466 } else { 467 // Create a new candidate with this address. 468 469 std::string type; 470 if (protocol_type_ == ICEPROTO_RFC5245) { 471 type = PRFLX_PORT_TYPE; 472 } else { 473 // G-ICE doesn't support prflx candidate. 474 // We set candidate type to STUN_PORT_TYPE if the binding request comes 475 // from a relay port or the shared socket is used. Otherwise we use the 476 // port's type as the candidate type. 477 if (port->Type() == RELAY_PORT_TYPE || port->SharedSocket()) { 478 type = STUN_PORT_TYPE; 479 } else { 480 type = port->Type(); 481 } 482 } 483 484 std::string id = talk_base::CreateRandomString(8); 485 new_remote_candidate = Candidate( 486 id, component(), ProtoToString(proto), address, 487 0, remote_username, remote_password, type, 488 port->Network()->name(), 0U, 489 talk_base::ToString<uint32>(talk_base::ComputeCrc32(id))); 490 new_remote_candidate.set_priority( 491 new_remote_candidate.GetPriority(ICE_TYPE_PREFERENCE_SRFLX)); 492 } 493 494 if (protocol_type_ == ICEPROTO_RFC5245) { 495 // RFC 5245 496 // If the source transport address of the request does not match any 497 // existing remote candidates, it represents a new peer reflexive remote 498 // candidate. 499 500 // The priority of the candidate is set to the PRIORITY attribute 501 // from the request. 502 const StunUInt32Attribute* priority_attr = 503 stun_msg->GetUInt32(STUN_ATTR_PRIORITY); 504 if (!priority_attr) { 505 LOG(LS_WARNING) << "P2PTransportChannel::OnUnknownAddress - " 506 << "No STUN_ATTR_PRIORITY found in the " 507 << "stun request message"; 508 port->SendBindingErrorResponse(stun_msg, address, 509 STUN_ERROR_BAD_REQUEST, 510 STUN_ERROR_REASON_BAD_REQUEST); 511 return; 512 } 513 new_remote_candidate.set_priority(priority_attr->value()); 514 515 // RFC5245, the agent constructs a pair whose local candidate is equal to 516 // the transport address on which the STUN request was received, and a 517 // remote candidate equal to the source transport address where the 518 // request came from. 519 520 // There shouldn't be an existing connection with this remote address. 521 ASSERT(port->GetConnection(new_remote_candidate.address()) == NULL); 522 523 Connection* connection = port->CreateConnection( 524 new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT); 525 if (!connection) { 526 ASSERT(false); 527 port->SendBindingErrorResponse(stun_msg, address, 528 STUN_ERROR_SERVER_ERROR, 529 STUN_ERROR_REASON_SERVER_ERROR); 530 return; 531 } 532 533 AddConnection(connection); 534 connection->ReceivedPing(); 535 536 // Send the pinger a successful stun response. 537 port->SendBindingResponse(stun_msg, address); 538 539 // Update the list of connections since we just added another. We do this 540 // after sending the response since it could (in principle) delete the 541 // connection in question. 542 SortConnections(); 543 } else { 544 // Check for connectivity to this address. Create connections 545 // to this address across all local ports. First, add this as a new remote 546 // address 547 if (!CreateConnections(new_remote_candidate, port, true)) { 548 // Hopefully this won't occur, because changing a destination address 549 // shouldn't cause a new connection to fail 550 ASSERT(false); 551 port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, 552 STUN_ERROR_REASON_SERVER_ERROR); 553 return; 554 } 555 556 // Send the pinger a successful stun response. 557 port->SendBindingResponse(stun_msg, address); 558 559 // Update the list of connections since we just added another. We do this 560 // after sending the response since it could (in principle) delete the 561 // connection in question. 562 SortConnections(); 563 } 564 } 565 566 void P2PTransportChannel::OnRoleConflict(PortInterface* port) { 567 SignalRoleConflict(this); // STUN ping will be sent when SetRole is called 568 // from Transport. 569 } 570 571 // When the signalling channel is ready, we can really kick off the allocator 572 void P2PTransportChannel::OnSignalingReady() { 573 ASSERT(worker_thread_ == talk_base::Thread::Current()); 574 if (waiting_for_signaling_) { 575 waiting_for_signaling_ = false; 576 AddAllocatorSession(allocator_->CreateSession( 577 SessionId(), content_name(), component(), ice_ufrag_, ice_pwd_)); 578 } 579 } 580 581 void P2PTransportChannel::OnUseCandidate(Connection* conn) { 582 ASSERT(worker_thread_ == talk_base::Thread::Current()); 583 ASSERT(ice_role_ == ICEROLE_CONTROLLED); 584 ASSERT(protocol_type_ == ICEPROTO_RFC5245); 585 if (conn->write_state() == Connection::STATE_WRITABLE) { 586 if (best_connection_ != conn) { 587 pending_best_connection_ = NULL; 588 SwitchBestConnectionTo(conn); 589 // Now we have selected the best connection, time to prune other existing 590 // connections and update the read/write state of the channel. 591 RequestSort(); 592 } 593 } else { 594 pending_best_connection_ = conn; 595 } 596 } 597 598 void P2PTransportChannel::OnCandidate(const Candidate& candidate) { 599 ASSERT(worker_thread_ == talk_base::Thread::Current()); 600 601 // Create connections to this remote candidate. 602 CreateConnections(candidate, NULL, false); 603 604 // Resort the connections list, which may have new elements. 605 SortConnections(); 606 } 607 608 // Creates connections from all of the ports that we care about to the given 609 // remote candidate. The return value is true if we created a connection from 610 // the origin port. 611 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate, 612 PortInterface* origin_port, 613 bool readable) { 614 ASSERT(worker_thread_ == talk_base::Thread::Current()); 615 616 Candidate new_remote_candidate(remote_candidate); 617 new_remote_candidate.set_generation( 618 GetRemoteCandidateGeneration(remote_candidate)); 619 // ICE candidates don't need to have username and password set, but 620 // the code below this (specifically, ConnectionRequest::Prepare in 621 // port.cc) uses the remote candidates's username. So, we set it 622 // here. 623 if (remote_candidate.username().empty()) { 624 new_remote_candidate.set_username(remote_ice_ufrag_); 625 } 626 if (remote_candidate.password().empty()) { 627 new_remote_candidate.set_password(remote_ice_pwd_); 628 } 629 630 // Add a new connection for this candidate to every port that allows such a 631 // connection (i.e., if they have compatible protocols) and that does not 632 // already have a connection to an equivalent candidate. We must be careful 633 // to make sure that the origin port is included, even if it was pruned, 634 // since that may be the only port that can create this connection. 635 636 bool created = false; 637 638 std::vector<PortInterface *>::reverse_iterator it; 639 for (it = ports_.rbegin(); it != ports_.rend(); ++it) { 640 if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) { 641 if (*it == origin_port) 642 created = true; 643 } 644 } 645 646 if ((origin_port != NULL) && 647 std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { 648 if (CreateConnection( 649 origin_port, new_remote_candidate, origin_port, readable)) 650 created = true; 651 } 652 653 // Remember this remote candidate so that we can add it to future ports. 654 RememberRemoteCandidate(new_remote_candidate, origin_port); 655 656 return created; 657 } 658 659 // Setup a connection object for the local and remote candidate combination. 660 // And then listen to connection object for changes. 661 bool P2PTransportChannel::CreateConnection(PortInterface* port, 662 const Candidate& remote_candidate, 663 PortInterface* origin_port, 664 bool readable) { 665 // Look for an existing connection with this remote address. If one is not 666 // found, then we can create a new connection for this address. 667 Connection* connection = port->GetConnection(remote_candidate.address()); 668 if (connection != NULL) { 669 // It is not legal to try to change any of the parameters of an existing 670 // connection; however, the other side can send a duplicate candidate. 671 if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { 672 LOG(INFO) << "Attempt to change a remote candidate"; 673 return false; 674 } 675 } else { 676 PortInterface::CandidateOrigin origin = GetOrigin(port, origin_port); 677 678 // Don't create connection if this is a candidate we received in a 679 // message and we are not allowed to make outgoing connections. 680 if (origin == cricket::PortInterface::ORIGIN_MESSAGE && incoming_only_) 681 return false; 682 683 connection = port->CreateConnection(remote_candidate, origin); 684 if (!connection) 685 return false; 686 687 AddConnection(connection); 688 689 LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", (" 690 << connections_.size() << " total)"; 691 } 692 693 // If we are readable, it is because we are creating this in response to a 694 // ping from the other side. This will cause the state to become readable. 695 if (readable) 696 connection->ReceivedPing(); 697 698 return true; 699 } 700 701 bool P2PTransportChannel::FindConnection( 702 cricket::Connection* connection) const { 703 std::vector<Connection*>::const_iterator citer = 704 std::find(connections_.begin(), connections_.end(), connection); 705 return citer != connections_.end(); 706 } 707 708 uint32 P2PTransportChannel::GetRemoteCandidateGeneration( 709 const Candidate& candidate) { 710 if (protocol_type_ == ICEPROTO_GOOGLE) { 711 // The Candidate.generation() can be trusted. Nothing needs to be done. 712 return candidate.generation(); 713 } 714 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 715 // Therefore we need to keep track of the remote ice restart so 716 // newer connections are prioritized over the older. 717 ASSERT(candidate.generation() == 0 || 718 candidate.generation() == remote_candidate_generation_); 719 return remote_candidate_generation_; 720 } 721 722 // Maintain our remote candidate list, adding this new remote one. 723 void P2PTransportChannel::RememberRemoteCandidate( 724 const Candidate& remote_candidate, PortInterface* origin_port) { 725 // Remove any candidates whose generation is older than this one. The 726 // presence of a new generation indicates that the old ones are not useful. 727 uint32 i = 0; 728 while (i < remote_candidates_.size()) { 729 if (remote_candidates_[i].generation() < remote_candidate.generation()) { 730 LOG(INFO) << "Pruning candidate from old generation: " 731 << remote_candidates_[i].address().ToSensitiveString(); 732 remote_candidates_.erase(remote_candidates_.begin() + i); 733 } else { 734 i += 1; 735 } 736 } 737 738 // Make sure this candidate is not a duplicate. 739 for (uint32 i = 0; i < remote_candidates_.size(); ++i) { 740 if (remote_candidates_[i].IsEquivalent(remote_candidate)) { 741 LOG(INFO) << "Duplicate candidate: " 742 << remote_candidate.address().ToSensitiveString(); 743 return; 744 } 745 } 746 747 // Try this candidate for all future ports. 748 remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); 749 } 750 751 // Set options on ourselves is simply setting options on all of our available 752 // port objects. 753 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { 754 OptionMap::iterator it = options_.find(opt); 755 if (it == options_.end()) { 756 options_.insert(std::make_pair(opt, value)); 757 } else if (it->second == value) { 758 return 0; 759 } else { 760 it->second = value; 761 } 762 763 for (uint32 i = 0; i < ports_.size(); ++i) { 764 int val = ports_[i]->SetOption(opt, value); 765 if (val < 0) { 766 // Because this also occurs deferred, probably no point in reporting an 767 // error 768 LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " 769 << ports_[i]->GetError(); 770 } 771 } 772 return 0; 773 } 774 775 // Send data to the other side, using our best connection. 776 int P2PTransportChannel::SendPacket(const char *data, size_t len, int flags) { 777 ASSERT(worker_thread_ == talk_base::Thread::Current()); 778 if (flags != 0) { 779 error_ = EINVAL; 780 return -1; 781 } 782 if (best_connection_ == NULL) { 783 error_ = EWOULDBLOCK; 784 return -1; 785 } 786 int sent = best_connection_->Send(data, len); 787 if (sent <= 0) { 788 ASSERT(sent < 0); 789 error_ = best_connection_->GetError(); 790 } 791 return sent; 792 } 793 794 bool P2PTransportChannel::GetStats(ConnectionInfos *infos) { 795 ASSERT(worker_thread_ == talk_base::Thread::Current()); 796 // Gather connection infos. 797 infos->clear(); 798 799 std::vector<Connection *>::const_iterator it; 800 for (it = connections_.begin(); it != connections_.end(); ++it) { 801 Connection *connection = *it; 802 ConnectionInfo info; 803 info.best_connection = (best_connection_ == connection); 804 info.readable = 805 (connection->read_state() == Connection::STATE_READABLE); 806 info.writable = 807 (connection->write_state() == Connection::STATE_WRITABLE); 808 info.timeout = 809 (connection->write_state() == Connection::STATE_WRITE_TIMEOUT); 810 info.new_connection = !connection->reported(); 811 connection->set_reported(true); 812 info.rtt = connection->rtt(); 813 info.sent_total_bytes = connection->sent_total_bytes(); 814 info.sent_bytes_second = connection->sent_bytes_second(); 815 info.recv_total_bytes = connection->recv_total_bytes(); 816 info.recv_bytes_second = connection->recv_bytes_second(); 817 info.local_candidate = connection->local_candidate(); 818 info.remote_candidate = connection->remote_candidate(); 819 info.key = connection; 820 infos->push_back(info); 821 } 822 823 return true; 824 } 825 826 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending) 827 void P2PTransportChannel::Allocate() { 828 // Time for a new allocator, lets make sure we have a signalling channel 829 // to communicate candidates through first. 830 waiting_for_signaling_ = true; 831 SignalRequestSignaling(this); 832 } 833 834 // Monitor connection states. 835 void P2PTransportChannel::UpdateConnectionStates() { 836 uint32 now = talk_base::Time(); 837 838 // We need to copy the list of connections since some may delete themselves 839 // when we call UpdateState. 840 for (uint32 i = 0; i < connections_.size(); ++i) 841 connections_[i]->UpdateState(now); 842 } 843 844 // Prepare for best candidate sorting. 845 void P2PTransportChannel::RequestSort() { 846 if (!sort_dirty_) { 847 worker_thread_->Post(this, MSG_SORT); 848 sort_dirty_ = true; 849 } 850 } 851 852 // Sort the available connections to find the best one. We also monitor 853 // the number of available connections and the current state. 854 void P2PTransportChannel::SortConnections() { 855 ASSERT(worker_thread_ == talk_base::Thread::Current()); 856 857 // Make sure the connection states are up-to-date since this affects how they 858 // will be sorted. 859 UpdateConnectionStates(); 860 861 // Any changes after this point will require a re-sort. 862 sort_dirty_ = false; 863 864 // Get a list of the networks that we are using. 865 std::set<talk_base::Network*> networks; 866 for (uint32 i = 0; i < connections_.size(); ++i) 867 networks.insert(connections_[i]->port()->Network()); 868 869 // Find the best alternative connection by sorting. It is important to note 870 // that amongst equal preference, writable connections, this will choose the 871 // one whose estimated latency is lowest. So it is the only one that we 872 // need to consider switching to. 873 874 ConnectionCompare cmp; 875 std::stable_sort(connections_.begin(), connections_.end(), cmp); 876 LOG(LS_VERBOSE) << "Sorting available connections:"; 877 for (uint32 i = 0; i < connections_.size(); ++i) { 878 LOG(LS_VERBOSE) << connections_[i]->ToString(); 879 } 880 881 Connection* top_connection = NULL; 882 if (connections_.size() > 0) 883 top_connection = connections_[0]; 884 885 // We don't want to pick the best connections if channel is using RFC5245 886 // and it's mode is CONTROLLED, as connections will be selected by the 887 // CONTROLLING agent. 888 889 // If necessary, switch to the new choice. 890 if (protocol_type_ != ICEPROTO_RFC5245 || ice_role_ == ICEROLE_CONTROLLING) { 891 if (ShouldSwitch(best_connection_, top_connection)) 892 SwitchBestConnectionTo(top_connection); 893 } 894 895 // We can prune any connection for which there is a writable connection on 896 // the same network with better or equal priority. We leave those with 897 // better priority just in case they become writable later (at which point, 898 // we would prune out the current best connection). We leave connections on 899 // other networks because they may not be using the same resources and they 900 // may represent very distinct paths over which we can switch. 901 std::set<talk_base::Network*>::iterator network; 902 for (network = networks.begin(); network != networks.end(); ++network) { 903 Connection* primier = GetBestConnectionOnNetwork(*network); 904 if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) 905 continue; 906 907 for (uint32 i = 0; i < connections_.size(); ++i) { 908 if ((connections_[i] != primier) && 909 (connections_[i]->port()->Network() == *network) && 910 (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { 911 connections_[i]->Prune(); 912 } 913 } 914 } 915 916 // Check if all connections are timedout. 917 bool all_connections_timedout = true; 918 for (uint32 i = 0; i < connections_.size(); ++i) { 919 if (connections_[i]->write_state() != Connection::STATE_WRITE_TIMEOUT) { 920 all_connections_timedout = false; 921 break; 922 } 923 } 924 925 // Now update the writable state of the channel with the information we have 926 // so far. 927 if (best_connection_ && best_connection_->writable()) { 928 HandleWritable(); 929 } else if (all_connections_timedout) { 930 HandleAllTimedOut(); 931 } else { 932 HandleNotWritable(); 933 } 934 935 // Update the state of this channel. This method is called whenever the 936 // state of any connection changes, so this is a good place to do this. 937 UpdateChannelState(); 938 } 939 940 941 // Track the best connection, and let listeners know 942 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { 943 // Note: if conn is NULL, the previous best_connection_ has been destroyed, 944 // so don't use it. 945 Connection* old_best_connection = best_connection_; 946 best_connection_ = conn; 947 if (best_connection_) { 948 if (old_best_connection) { 949 LOG_J(LS_INFO, this) << "Previous best connection: " 950 << old_best_connection->ToString(); 951 } 952 LOG_J(LS_INFO, this) << "New best connection: " 953 << best_connection_->ToString(); 954 SignalRouteChange(this, best_connection_->remote_candidate()); 955 } else { 956 LOG_J(LS_INFO, this) << "No best connection"; 957 } 958 } 959 960 void P2PTransportChannel::UpdateChannelState() { 961 // The Handle* functions already set the writable state. We'll just double- 962 // check it here. 963 bool writable = ((best_connection_ != NULL) && 964 (best_connection_->write_state() == 965 Connection::STATE_WRITABLE)); 966 ASSERT(writable == this->writable()); 967 if (writable != this->writable()) 968 LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch"; 969 970 bool readable = false; 971 for (uint32 i = 0; i < connections_.size(); ++i) { 972 if (connections_[i]->read_state() == Connection::STATE_READABLE) 973 readable = true; 974 } 975 set_readable(readable); 976 } 977 978 // We checked the status of our connections and we had at least one that 979 // was writable, go into the writable state. 980 void P2PTransportChannel::HandleWritable() { 981 ASSERT(worker_thread_ == talk_base::Thread::Current()); 982 if (!writable()) { 983 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { 984 if (allocator_sessions_[i]->IsGettingPorts()) { 985 allocator_sessions_[i]->StopGettingPorts(); 986 } 987 } 988 } 989 990 was_writable_ = true; 991 set_writable(true); 992 } 993 994 // Notify upper layer about channel not writable state, if it was before. 995 void P2PTransportChannel::HandleNotWritable() { 996 ASSERT(worker_thread_ == talk_base::Thread::Current()); 997 if (was_writable_) { 998 was_writable_ = false; 999 set_writable(false); 1000 } 1001 } 1002 1003 void P2PTransportChannel::HandleAllTimedOut() { 1004 // Currently we are treating this as channel not writable. 1005 HandleNotWritable(); 1006 } 1007 1008 // If we have a best connection, return it, otherwise return top one in the 1009 // list (later we will mark it best). 1010 Connection* P2PTransportChannel::GetBestConnectionOnNetwork( 1011 talk_base::Network* network) { 1012 // If the best connection is on this network, then it wins. 1013 if (best_connection_ && (best_connection_->port()->Network() == network)) 1014 return best_connection_; 1015 1016 // Otherwise, we return the top-most in sorted order. 1017 for (uint32 i = 0; i < connections_.size(); ++i) { 1018 if (connections_[i]->port()->Network() == network) 1019 return connections_[i]; 1020 } 1021 1022 return NULL; 1023 } 1024 1025 // Handle any queued up requests 1026 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) { 1027 switch (pmsg->message_id) { 1028 case MSG_SORT: 1029 OnSort(); 1030 break; 1031 case MSG_PING: 1032 OnPing(); 1033 break; 1034 default: 1035 ASSERT(false); 1036 break; 1037 } 1038 } 1039 1040 // Handle queued up sort request 1041 void P2PTransportChannel::OnSort() { 1042 // Resort the connections based on the new statistics. 1043 SortConnections(); 1044 } 1045 1046 // Handle queued up ping request 1047 void P2PTransportChannel::OnPing() { 1048 // Make sure the states of the connections are up-to-date (since this affects 1049 // which ones are pingable). 1050 UpdateConnectionStates(); 1051 1052 // Find the oldest pingable connection and have it do a ping. 1053 Connection* conn = FindNextPingableConnection(); 1054 if (conn) 1055 PingConnection(conn); 1056 1057 // Post ourselves a message to perform the next ping. 1058 uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY; 1059 thread()->PostDelayed(delay, this, MSG_PING); 1060 } 1061 1062 // Is the connection in a state for us to even consider pinging the other side? 1063 bool P2PTransportChannel::IsPingable(Connection* conn) { 1064 // An unconnected connection cannot be written to at all, so pinging is out 1065 // of the question. 1066 if (!conn->connected()) 1067 return false; 1068 1069 if (writable()) { 1070 // If we are writable, then we only want to ping connections that could be 1071 // better than this one, i.e., the ones that were not pruned. 1072 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); 1073 } else { 1074 // If we are not writable, then we need to try everything that might work. 1075 // This includes both connections that do not have write timeout as well as 1076 // ones that do not have read timeout. A connection could be readable but 1077 // be in write-timeout if we pruned it before. Since the other side is 1078 // still pinging it, it very well might still work. 1079 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || 1080 (conn->read_state() != Connection::STATE_READ_TIMEOUT); 1081 } 1082 } 1083 1084 // Returns the next pingable connection to ping. This will be the oldest 1085 // pingable connection unless we have a writable connection that is past the 1086 // maximum acceptable ping delay. 1087 Connection* P2PTransportChannel::FindNextPingableConnection() { 1088 uint32 now = talk_base::Time(); 1089 if (best_connection_ && 1090 (best_connection_->write_state() == Connection::STATE_WRITABLE) && 1091 (best_connection_->last_ping_sent() 1092 + MAX_CURRENT_WRITABLE_DELAY <= now)) { 1093 return best_connection_; 1094 } 1095 1096 Connection* oldest_conn = NULL; 1097 uint32 oldest_time = 0xFFFFFFFF; 1098 for (uint32 i = 0; i < connections_.size(); ++i) { 1099 if (IsPingable(connections_[i])) { 1100 if (connections_[i]->last_ping_sent() < oldest_time) { 1101 oldest_time = connections_[i]->last_ping_sent(); 1102 oldest_conn = connections_[i]; 1103 } 1104 } 1105 } 1106 return oldest_conn; 1107 } 1108 1109 // Apart from sending ping from |conn| this method also updates 1110 // |use_candidate_attr| flag. The criteria to update this flag is 1111 // explained below. 1112 // Set USE-CANDIDATE if doing ICE AND this channel is in CONTROLLING AND 1113 // a) Channel is in FULL ICE AND 1114 // a.1) |conn| is the best connection OR 1115 // a.2) there is no best connection OR 1116 // a.3) the best connection is unwritable OR 1117 // a.4) |conn| has higher priority than best_connection. 1118 // b) we're doing LITE ICE AND 1119 // b.1) |conn| is the best_connection AND 1120 // b.2) |conn| is writable. 1121 void P2PTransportChannel::PingConnection(Connection* conn) { 1122 bool use_candidate = false; 1123 if (protocol_type_ == ICEPROTO_RFC5245) { 1124 if (remote_ice_mode_ == ICEMODE_FULL && ice_role_ == ICEROLE_CONTROLLING) { 1125 use_candidate = (conn == best_connection_) || 1126 (best_connection_ == NULL) || 1127 (!best_connection_->writable()) || 1128 (conn->priority() > best_connection_->priority()); 1129 } else if (remote_ice_mode_ == ICEMODE_LITE && conn == best_connection_) { 1130 use_candidate = best_connection_->writable(); 1131 } 1132 } 1133 conn->set_use_candidate_attr(use_candidate); 1134 conn->Ping(talk_base::Time()); 1135 } 1136 1137 // When a connection's state changes, we need to figure out who to use as 1138 // the best connection again. It could have become usable, or become unusable. 1139 void P2PTransportChannel::OnConnectionStateChange(Connection* connection) { 1140 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1141 1142 // Update the best connection if the state change is from pending best 1143 // connection and role is controlled. 1144 if (protocol_type_ == ICEPROTO_RFC5245 && ice_role_ == ICEROLE_CONTROLLED) { 1145 if (connection == pending_best_connection_ && connection->writable()) { 1146 pending_best_connection_ = NULL; 1147 SwitchBestConnectionTo(connection); 1148 } 1149 } 1150 1151 // We have to unroll the stack before doing this because we may be changing 1152 // the state of connections while sorting. 1153 RequestSort(); 1154 } 1155 1156 // When a connection is removed, edit it out, and then update our best 1157 // connection. 1158 void P2PTransportChannel::OnConnectionDestroyed(Connection* connection) { 1159 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1160 1161 // Note: the previous best_connection_ may be destroyed by now, so don't 1162 // use it. 1163 1164 // Remove this connection from the list. 1165 std::vector<Connection*>::iterator iter = 1166 std::find(connections_.begin(), connections_.end(), connection); 1167 ASSERT(iter != connections_.end()); 1168 connections_.erase(iter); 1169 1170 LOG_J(LS_INFO, this) << "Removed connection (" 1171 << static_cast<int>(connections_.size()) << " remaining)"; 1172 1173 if (pending_best_connection_ == connection) { 1174 pending_best_connection_ = NULL; 1175 } 1176 1177 // If this is currently the best connection, then we need to pick a new one. 1178 // The call to SortConnections will pick a new one. It looks at the current 1179 // best connection in order to avoid switching between fairly similar ones. 1180 // Since this connection is no longer an option, we can just set best to NULL 1181 // and re-choose a best assuming that there was no best connection. 1182 if (best_connection_ == connection) { 1183 SwitchBestConnectionTo(NULL); 1184 RequestSort(); 1185 } 1186 } 1187 1188 // When a port is destroyed remove it from our list of ports to use for 1189 // connection attempts. 1190 void P2PTransportChannel::OnPortDestroyed(PortInterface* port) { 1191 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1192 1193 // Remove this port from the list (if we didn't drop it already). 1194 std::vector<PortInterface*>::iterator iter = 1195 std::find(ports_.begin(), ports_.end(), port); 1196 if (iter != ports_.end()) 1197 ports_.erase(iter); 1198 1199 LOG(INFO) << "Removed port from p2p socket: " 1200 << static_cast<int>(ports_.size()) << " remaining"; 1201 } 1202 1203 // We data is available, let listeners know 1204 void P2PTransportChannel::OnReadPacket(Connection *connection, const char *data, 1205 size_t len) { 1206 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1207 1208 // Do not deliver, if packet doesn't belong to the correct transport channel. 1209 if (!FindConnection(connection)) 1210 return; 1211 1212 // Let the client know of an incoming packet 1213 SignalReadPacket(this, data, len, 0); 1214 } 1215 1216 void P2PTransportChannel::OnReadyToSend(Connection* connection) { 1217 if (connection == best_connection_ && writable()) { 1218 SignalReadyToSend(this); 1219 } 1220 } 1221 1222 } // namespace cricket 1223