1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/p2ptransportchannel.h" 29 30 #include <set> 31 #include "talk/base/common.h" 32 #include "talk/base/crc32.h" 33 #include "talk/base/logging.h" 34 #include "talk/base/stringencode.h" 35 #include "talk/p2p/base/common.h" 36 #include "talk/p2p/base/relayport.h" // For RELAY_PORT_TYPE. 37 #include "talk/p2p/base/stunport.h" // For STUN_PORT_TYPE. 38 39 namespace { 40 41 // messages for queuing up work for ourselves 42 enum { 43 MSG_SORT = 1, 44 MSG_PING, 45 }; 46 47 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers) 48 // for pinging. When the socket is writable, we will use only 1 Kbps because 49 // we don't want to degrade the quality on a modem. These numbers should work 50 // well on a 28.8K modem, which is the slowest connection on which the voice 51 // quality is reasonable at all. 52 static const uint32 PING_PACKET_SIZE = 60 * 8; 53 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms 54 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms 55 56 // If there is a current writable connection, then we will also try hard to 57 // make sure it is pinged at this rate. 58 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit 59 60 // The minimum improvement in RTT that justifies a switch. 61 static const double kMinImprovement = 10; 62 63 cricket::PortInterface::CandidateOrigin GetOrigin(cricket::PortInterface* port, 64 cricket::PortInterface* origin_port) { 65 if (!origin_port) 66 return cricket::PortInterface::ORIGIN_MESSAGE; 67 else if (port == origin_port) 68 return cricket::PortInterface::ORIGIN_THIS_PORT; 69 else 70 return cricket::PortInterface::ORIGIN_OTHER_PORT; 71 } 72 73 // Compares two connections based only on static information about them. 74 int CompareConnectionCandidates(cricket::Connection* a, 75 cricket::Connection* b) { 76 // Compare connection priority. Lower values get sorted last. 77 if (a->priority() > b->priority()) 78 return 1; 79 if (a->priority() < b->priority()) 80 return -1; 81 82 // If we're still tied at this point, prefer a younger generation. 83 return (a->remote_candidate().generation() + a->port()->generation()) - 84 (b->remote_candidate().generation() + b->port()->generation()); 85 } 86 87 // Compare two connections based on their writability and static preferences. 88 int CompareConnections(cricket::Connection *a, cricket::Connection *b) { 89 // Sort based on write-state. Better states have lower values. 90 if (a->write_state() < b->write_state()) 91 return 1; 92 if (a->write_state() > b->write_state()) 93 return -1; 94 95 // Compare the candidate information. 96 return CompareConnectionCandidates(a, b); 97 } 98 99 // Wraps the comparison connection into a less than operator that puts higher 100 // priority writable connections first. 101 class ConnectionCompare { 102 public: 103 bool operator()(const cricket::Connection *ca, 104 const cricket::Connection *cb) { 105 cricket::Connection* a = const_cast<cricket::Connection*>(ca); 106 cricket::Connection* b = const_cast<cricket::Connection*>(cb); 107 108 ASSERT(a->port()->IceProtocol() == b->port()->IceProtocol()); 109 110 // Compare first on writability and static preferences. 111 int cmp = CompareConnections(a, b); 112 if (cmp > 0) 113 return true; 114 if (cmp < 0) 115 return false; 116 117 // Otherwise, sort based on latency estimate. 118 return a->rtt() < b->rtt(); 119 120 // Should we bother checking for the last connection that last received 121 // data? It would help rendezvous on the connection that is also receiving 122 // packets. 123 // 124 // TODO: Yes we should definitely do this. The TCP protocol gains 125 // efficiency by being used bidirectionally, as opposed to two separate 126 // unidirectional streams. This test should probably occur before 127 // comparison of local prefs (assuming combined prefs are the same). We 128 // need to be careful though, not to bounce back and forth with both sides 129 // trying to rendevous with the other. 130 } 131 }; 132 133 // Determines whether we should switch between two connections, based first on 134 // static preferences and then (if those are equal) on latency estimates. 135 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { 136 if (a_conn == b_conn) 137 return false; 138 139 if (!a_conn || !b_conn) // don't think the latter should happen 140 return true; 141 142 int prefs_cmp = CompareConnections(a_conn, b_conn); 143 if (prefs_cmp < 0) 144 return true; 145 if (prefs_cmp > 0) 146 return false; 147 148 return b_conn->rtt() <= a_conn->rtt() + kMinImprovement; 149 } 150 151 } // unnamed namespace 152 153 namespace cricket { 154 155 P2PTransportChannel::P2PTransportChannel(const std::string& content_name, 156 int component, 157 P2PTransport* transport, 158 PortAllocator *allocator) : 159 TransportChannelImpl(content_name, component), 160 transport_(transport), 161 allocator_(allocator), 162 worker_thread_(talk_base::Thread::Current()), 163 incoming_only_(false), 164 waiting_for_signaling_(false), 165 error_(0), 166 best_connection_(NULL), 167 pending_best_connection_(NULL), 168 sort_dirty_(false), 169 was_writable_(false), 170 protocol_type_(ICEPROTO_GOOGLE), 171 remote_ice_mode_(ICEMODE_FULL), 172 ice_role_(ICEROLE_UNKNOWN), 173 tiebreaker_(0), 174 remote_candidate_generation_(0) { 175 } 176 177 P2PTransportChannel::~P2PTransportChannel() { 178 ASSERT(worker_thread_ == talk_base::Thread::Current()); 179 180 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 181 delete allocator_sessions_[i]; 182 } 183 184 // Add the allocator session to our list so that we know which sessions 185 // are still active. 186 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { 187 session->set_generation(static_cast<uint32>(allocator_sessions_.size())); 188 allocator_sessions_.push_back(session); 189 190 // We now only want to apply new candidates that we receive to the ports 191 // created by this new session because these are replacing those of the 192 // previous sessions. 193 ports_.clear(); 194 195 session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); 196 session->SignalCandidatesReady.connect( 197 this, &P2PTransportChannel::OnCandidatesReady); 198 session->SignalCandidatesAllocationDone.connect( 199 this, &P2PTransportChannel::OnCandidatesAllocationDone); 200 session->StartGettingPorts(); 201 } 202 203 void P2PTransportChannel::AddConnection(Connection* connection) { 204 connections_.push_back(connection); 205 connection->set_remote_ice_mode(remote_ice_mode_); 206 connection->SignalReadPacket.connect( 207 this, &P2PTransportChannel::OnReadPacket); 208 connection->SignalReadyToSend.connect( 209 this, &P2PTransportChannel::OnReadyToSend); 210 connection->SignalStateChange.connect( 211 this, &P2PTransportChannel::OnConnectionStateChange); 212 connection->SignalDestroyed.connect( 213 this, &P2PTransportChannel::OnConnectionDestroyed); 214 connection->SignalUseCandidate.connect( 215 this, &P2PTransportChannel::OnUseCandidate); 216 } 217 218 void P2PTransportChannel::SetIceRole(IceRole ice_role) { 219 ASSERT(worker_thread_ == talk_base::Thread::Current()); 220 if (ice_role_ != ice_role) { 221 ice_role_ = ice_role; 222 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 223 it != ports_.end(); ++it) { 224 (*it)->SetIceRole(ice_role); 225 } 226 } 227 } 228 229 void P2PTransportChannel::SetIceTiebreaker(uint64 tiebreaker) { 230 ASSERT(worker_thread_ == talk_base::Thread::Current()); 231 if (!ports_.empty()) { 232 LOG(LS_ERROR) 233 << "Attempt to change tiebreaker after Port has been allocated."; 234 return; 235 } 236 237 tiebreaker_ = tiebreaker; 238 } 239 240 void P2PTransportChannel::SetIceProtocolType(IceProtocolType type) { 241 ASSERT(worker_thread_ == talk_base::Thread::Current()); 242 243 protocol_type_ = type; 244 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 245 it != ports_.end(); ++it) { 246 (*it)->SetIceProtocolType(protocol_type_); 247 } 248 } 249 250 void P2PTransportChannel::SetIceCredentials(const std::string& ice_ufrag, 251 const std::string& ice_pwd) { 252 ASSERT(worker_thread_ == talk_base::Thread::Current()); 253 bool ice_restart = false; 254 if (!ice_ufrag_.empty() && !ice_pwd_.empty()) { 255 // Restart candidate allocation if there is any change in either 256 // ice ufrag or password. 257 ice_restart = (ice_ufrag_ != ice_ufrag) || (ice_pwd_!= ice_pwd); 258 } 259 260 ice_ufrag_ = ice_ufrag; 261 ice_pwd_ = ice_pwd; 262 263 if (ice_restart) { 264 // Restart candidate gathering. 265 Allocate(); 266 } 267 } 268 269 void P2PTransportChannel::SetRemoteIceCredentials(const std::string& ice_ufrag, 270 const std::string& ice_pwd) { 271 ASSERT(worker_thread_ == talk_base::Thread::Current()); 272 bool ice_restart = false; 273 if (!remote_ice_ufrag_.empty() && !remote_ice_pwd_.empty()) { 274 ice_restart = (remote_ice_ufrag_ != ice_ufrag) || 275 (remote_ice_pwd_!= ice_pwd); 276 } 277 278 remote_ice_ufrag_ = ice_ufrag; 279 remote_ice_pwd_ = ice_pwd; 280 281 if (ice_restart) { 282 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 283 // Therefore we need to keep track of the remote ice restart so 284 // newer connections are prioritized over the older. 285 ++remote_candidate_generation_; 286 } 287 } 288 289 void P2PTransportChannel::SetRemoteIceMode(IceMode mode) { 290 remote_ice_mode_ = mode; 291 } 292 293 // Go into the state of processing candidates, and running in general 294 void P2PTransportChannel::Connect() { 295 ASSERT(worker_thread_ == talk_base::Thread::Current()); 296 if (ice_ufrag_.empty() || ice_pwd_.empty()) { 297 ASSERT(false); 298 LOG(LS_ERROR) << "P2PTransportChannel::Connect: The ice_ufrag_ and the " 299 << "ice_pwd_ are not set."; 300 return; 301 } 302 303 // Kick off an allocator session 304 Allocate(); 305 306 // Start pinging as the ports come in. 307 thread()->Post(this, MSG_PING); 308 } 309 310 // Reset the socket, clear up any previous allocations and start over 311 void P2PTransportChannel::Reset() { 312 ASSERT(worker_thread_ == talk_base::Thread::Current()); 313 314 // Get rid of all the old allocators. This should clean up everything. 315 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 316 delete allocator_sessions_[i]; 317 318 allocator_sessions_.clear(); 319 ports_.clear(); 320 connections_.clear(); 321 best_connection_ = NULL; 322 323 // Forget about all of the candidates we got before. 324 remote_candidates_.clear(); 325 326 // Revert to the initial state. 327 set_readable(false); 328 set_writable(false); 329 330 // Reinitialize the rest of our state. 331 waiting_for_signaling_ = false; 332 sort_dirty_ = false; 333 334 // If we allocated before, start a new one now. 335 if (transport_->connect_requested()) 336 Allocate(); 337 338 // Start pinging as the ports come in. 339 thread()->Clear(this); 340 thread()->Post(this, MSG_PING); 341 } 342 343 // A new port is available, attempt to make connections for it 344 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session, 345 PortInterface* port) { 346 ASSERT(worker_thread_ == talk_base::Thread::Current()); 347 348 // Set in-effect options on the new port 349 for (OptionMap::const_iterator it = options_.begin(); 350 it != options_.end(); 351 ++it) { 352 int val = port->SetOption(it->first, it->second); 353 if (val < 0) { 354 LOG_J(LS_WARNING, port) << "SetOption(" << it->first 355 << ", " << it->second 356 << ") failed: " << port->GetError(); 357 } 358 } 359 360 // Remember the ports and candidates, and signal that candidates are ready. 361 // The session will handle this, and send an initiate/accept/modify message 362 // if one is pending. 363 364 port->SetIceProtocolType(protocol_type_); 365 port->SetIceRole(ice_role_); 366 port->SetIceTiebreaker(tiebreaker_); 367 ports_.push_back(port); 368 port->SignalUnknownAddress.connect( 369 this, &P2PTransportChannel::OnUnknownAddress); 370 port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed); 371 port->SignalRoleConflict.connect( 372 this, &P2PTransportChannel::OnRoleConflict); 373 374 // Attempt to create a connection from this new port to all of the remote 375 // candidates that we were given so far. 376 377 std::vector<RemoteCandidate>::iterator iter; 378 for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); 379 ++iter) { 380 CreateConnection(port, *iter, iter->origin_port(), false); 381 } 382 383 SortConnections(); 384 } 385 386 // A new candidate is available, let listeners know 387 void P2PTransportChannel::OnCandidatesReady( 388 PortAllocatorSession *session, const std::vector<Candidate>& candidates) { 389 ASSERT(worker_thread_ == talk_base::Thread::Current()); 390 for (size_t i = 0; i < candidates.size(); ++i) { 391 SignalCandidateReady(this, candidates[i]); 392 } 393 } 394 395 void P2PTransportChannel::OnCandidatesAllocationDone( 396 PortAllocatorSession* session) { 397 ASSERT(worker_thread_ == talk_base::Thread::Current()); 398 SignalCandidatesAllocationDone(this); 399 } 400 401 // Handle stun packets 402 void P2PTransportChannel::OnUnknownAddress( 403 PortInterface* port, 404 const talk_base::SocketAddress& address, ProtocolType proto, 405 IceMessage* stun_msg, const std::string &remote_username, 406 bool port_muxed) { 407 ASSERT(worker_thread_ == talk_base::Thread::Current()); 408 409 // Port has received a valid stun packet from an address that no Connection 410 // is currently available for. See if we already have a candidate with the 411 // address. If it isn't we need to create new candidate for it. 412 413 // Determine if the remote candidates use shared ufrag. 414 bool ufrag_per_port = false; 415 std::vector<RemoteCandidate>::iterator it; 416 if (remote_candidates_.size() > 0) { 417 it = remote_candidates_.begin(); 418 std::string username = it->username(); 419 for (; it != remote_candidates_.end(); ++it) { 420 if (it->username() != username) { 421 ufrag_per_port = true; 422 break; 423 } 424 } 425 } 426 427 const Candidate* candidate = NULL; 428 bool known_username = false; 429 std::string remote_password; 430 for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { 431 if (it->username() == remote_username) { 432 remote_password = it->password(); 433 known_username = true; 434 if (ufrag_per_port || 435 (it->address() == address && 436 it->protocol() == ProtoToString(proto))) { 437 candidate = &(*it); 438 break; 439 } 440 // We don't want to break here because we may find a match of the address 441 // later. 442 } 443 } 444 445 if (!known_username) { 446 if (port_muxed) { 447 // When Ports are muxed, SignalUnknownAddress is delivered to all 448 // P2PTransportChannel belong to a session. Return from here will 449 // save us from sending stun binding error message from incorrect channel. 450 return; 451 } 452 // Don't know about this username, the request is bogus 453 // This sometimes happens if a binding response comes in before the ACCEPT 454 // message. It is totally valid; the retry state machine will try again. 455 port->SendBindingErrorResponse(stun_msg, address, 456 STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); 457 return; 458 } 459 460 Candidate new_remote_candidate; 461 if (candidate != NULL) { 462 new_remote_candidate = *candidate; 463 if (ufrag_per_port) { 464 new_remote_candidate.set_address(address); 465 } 466 } else { 467 // Create a new candidate with this address. 468 469 std::string type; 470 if (protocol_type_ == ICEPROTO_RFC5245) { 471 type = PRFLX_PORT_TYPE; 472 } else { 473 // G-ICE doesn't support prflx candidate. 474 // We set candidate type to STUN_PORT_TYPE if the binding request comes 475 // from a relay port or the shared socket is used. Otherwise we use the 476 // port's type as the candidate type. 477 if (port->Type() == RELAY_PORT_TYPE || port->SharedSocket()) { 478 type = STUN_PORT_TYPE; 479 } else { 480 type = port->Type(); 481 } 482 } 483 484 std::string id = talk_base::CreateRandomString(8); 485 new_remote_candidate = Candidate( 486 id, component(), ProtoToString(proto), address, 487 0, remote_username, remote_password, type, 488 port->Network()->name(), 0U, 489 talk_base::ToString<uint32>(talk_base::ComputeCrc32(id))); 490 new_remote_candidate.set_priority( 491 new_remote_candidate.GetPriority(ICE_TYPE_PREFERENCE_SRFLX)); 492 } 493 494 if (protocol_type_ == ICEPROTO_RFC5245) { 495 // RFC 5245 496 // If the source transport address of the request does not match any 497 // existing remote candidates, it represents a new peer reflexive remote 498 // candidate. 499 500 // The priority of the candidate is set to the PRIORITY attribute 501 // from the request. 502 const StunUInt32Attribute* priority_attr = 503 stun_msg->GetUInt32(STUN_ATTR_PRIORITY); 504 if (!priority_attr) { 505 LOG(LS_WARNING) << "P2PTransportChannel::OnUnknownAddress - " 506 << "No STUN_ATTR_PRIORITY found in the " 507 << "stun request message"; 508 port->SendBindingErrorResponse(stun_msg, address, 509 STUN_ERROR_BAD_REQUEST, 510 STUN_ERROR_REASON_BAD_REQUEST); 511 return; 512 } 513 new_remote_candidate.set_priority(priority_attr->value()); 514 515 // RFC5245, the agent constructs a pair whose local candidate is equal to 516 // the transport address on which the STUN request was received, and a 517 // remote candidate equal to the source transport address where the 518 // request came from. 519 520 // There shouldn't be an existing connection with this remote address. 521 // When ports are muxed, this channel might get multiple unknown address 522 // signals. In that case if the connection is already exists, we should 523 // simply ignore the signal othewise send server error. 524 if (port->GetConnection(new_remote_candidate.address())) { 525 if (port_muxed) { 526 LOG(LS_INFO) << "Connection already exists for peer reflexive " 527 << "candidate: " << new_remote_candidate.ToString(); 528 return; 529 } else { 530 ASSERT(false); 531 port->SendBindingErrorResponse(stun_msg, address, 532 STUN_ERROR_SERVER_ERROR, 533 STUN_ERROR_REASON_SERVER_ERROR); 534 return; 535 } 536 } 537 538 Connection* connection = port->CreateConnection( 539 new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT); 540 if (!connection) { 541 ASSERT(false); 542 port->SendBindingErrorResponse(stun_msg, address, 543 STUN_ERROR_SERVER_ERROR, 544 STUN_ERROR_REASON_SERVER_ERROR); 545 return; 546 } 547 548 AddConnection(connection); 549 connection->ReceivedPing(); 550 551 // Send the pinger a successful stun response. 552 port->SendBindingResponse(stun_msg, address); 553 554 // Update the list of connections since we just added another. We do this 555 // after sending the response since it could (in principle) delete the 556 // connection in question. 557 SortConnections(); 558 } else { 559 // Check for connectivity to this address. Create connections 560 // to this address across all local ports. First, add this as a new remote 561 // address 562 if (!CreateConnections(new_remote_candidate, port, true)) { 563 // Hopefully this won't occur, because changing a destination address 564 // shouldn't cause a new connection to fail 565 ASSERT(false); 566 port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, 567 STUN_ERROR_REASON_SERVER_ERROR); 568 return; 569 } 570 571 // Send the pinger a successful stun response. 572 port->SendBindingResponse(stun_msg, address); 573 574 // Update the list of connections since we just added another. We do this 575 // after sending the response since it could (in principle) delete the 576 // connection in question. 577 SortConnections(); 578 } 579 } 580 581 void P2PTransportChannel::OnRoleConflict(PortInterface* port) { 582 SignalRoleConflict(this); // STUN ping will be sent when SetRole is called 583 // from Transport. 584 } 585 586 // When the signalling channel is ready, we can really kick off the allocator 587 void P2PTransportChannel::OnSignalingReady() { 588 ASSERT(worker_thread_ == talk_base::Thread::Current()); 589 if (waiting_for_signaling_) { 590 waiting_for_signaling_ = false; 591 AddAllocatorSession(allocator_->CreateSession( 592 SessionId(), content_name(), component(), ice_ufrag_, ice_pwd_)); 593 } 594 } 595 596 void P2PTransportChannel::OnUseCandidate(Connection* conn) { 597 ASSERT(worker_thread_ == talk_base::Thread::Current()); 598 ASSERT(ice_role_ == ICEROLE_CONTROLLED); 599 ASSERT(protocol_type_ == ICEPROTO_RFC5245); 600 if (conn->write_state() == Connection::STATE_WRITABLE) { 601 if (best_connection_ != conn) { 602 pending_best_connection_ = NULL; 603 SwitchBestConnectionTo(conn); 604 // Now we have selected the best connection, time to prune other existing 605 // connections and update the read/write state of the channel. 606 RequestSort(); 607 } 608 } else { 609 pending_best_connection_ = conn; 610 } 611 } 612 613 void P2PTransportChannel::OnCandidate(const Candidate& candidate) { 614 ASSERT(worker_thread_ == talk_base::Thread::Current()); 615 616 // Create connections to this remote candidate. 617 CreateConnections(candidate, NULL, false); 618 619 // Resort the connections list, which may have new elements. 620 SortConnections(); 621 } 622 623 // Creates connections from all of the ports that we care about to the given 624 // remote candidate. The return value is true if we created a connection from 625 // the origin port. 626 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate, 627 PortInterface* origin_port, 628 bool readable) { 629 ASSERT(worker_thread_ == talk_base::Thread::Current()); 630 631 Candidate new_remote_candidate(remote_candidate); 632 new_remote_candidate.set_generation( 633 GetRemoteCandidateGeneration(remote_candidate)); 634 // ICE candidates don't need to have username and password set, but 635 // the code below this (specifically, ConnectionRequest::Prepare in 636 // port.cc) uses the remote candidates's username. So, we set it 637 // here. 638 if (remote_candidate.username().empty()) { 639 new_remote_candidate.set_username(remote_ice_ufrag_); 640 } 641 if (remote_candidate.password().empty()) { 642 new_remote_candidate.set_password(remote_ice_pwd_); 643 } 644 645 // Add a new connection for this candidate to every port that allows such a 646 // connection (i.e., if they have compatible protocols) and that does not 647 // already have a connection to an equivalent candidate. We must be careful 648 // to make sure that the origin port is included, even if it was pruned, 649 // since that may be the only port that can create this connection. 650 651 bool created = false; 652 653 std::vector<PortInterface *>::reverse_iterator it; 654 for (it = ports_.rbegin(); it != ports_.rend(); ++it) { 655 if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) { 656 if (*it == origin_port) 657 created = true; 658 } 659 } 660 661 if ((origin_port != NULL) && 662 std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { 663 if (CreateConnection( 664 origin_port, new_remote_candidate, origin_port, readable)) 665 created = true; 666 } 667 668 // Remember this remote candidate so that we can add it to future ports. 669 RememberRemoteCandidate(new_remote_candidate, origin_port); 670 671 return created; 672 } 673 674 // Setup a connection object for the local and remote candidate combination. 675 // And then listen to connection object for changes. 676 bool P2PTransportChannel::CreateConnection(PortInterface* port, 677 const Candidate& remote_candidate, 678 PortInterface* origin_port, 679 bool readable) { 680 // Look for an existing connection with this remote address. If one is not 681 // found, then we can create a new connection for this address. 682 Connection* connection = port->GetConnection(remote_candidate.address()); 683 if (connection != NULL) { 684 // It is not legal to try to change any of the parameters of an existing 685 // connection; however, the other side can send a duplicate candidate. 686 if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { 687 LOG(INFO) << "Attempt to change a remote candidate"; 688 return false; 689 } 690 } else { 691 PortInterface::CandidateOrigin origin = GetOrigin(port, origin_port); 692 693 // Don't create connection if this is a candidate we received in a 694 // message and we are not allowed to make outgoing connections. 695 if (origin == cricket::PortInterface::ORIGIN_MESSAGE && incoming_only_) 696 return false; 697 698 connection = port->CreateConnection(remote_candidate, origin); 699 if (!connection) 700 return false; 701 702 AddConnection(connection); 703 704 LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", (" 705 << connections_.size() << " total)"; 706 } 707 708 // If we are readable, it is because we are creating this in response to a 709 // ping from the other side. This will cause the state to become readable. 710 if (readable) 711 connection->ReceivedPing(); 712 713 return true; 714 } 715 716 bool P2PTransportChannel::FindConnection( 717 cricket::Connection* connection) const { 718 std::vector<Connection*>::const_iterator citer = 719 std::find(connections_.begin(), connections_.end(), connection); 720 return citer != connections_.end(); 721 } 722 723 uint32 P2PTransportChannel::GetRemoteCandidateGeneration( 724 const Candidate& candidate) { 725 if (protocol_type_ == ICEPROTO_GOOGLE) { 726 // The Candidate.generation() can be trusted. Nothing needs to be done. 727 return candidate.generation(); 728 } 729 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 730 // Therefore we need to keep track of the remote ice restart so 731 // newer connections are prioritized over the older. 732 ASSERT(candidate.generation() == 0 || 733 candidate.generation() == remote_candidate_generation_); 734 return remote_candidate_generation_; 735 } 736 737 // Maintain our remote candidate list, adding this new remote one. 738 void P2PTransportChannel::RememberRemoteCandidate( 739 const Candidate& remote_candidate, PortInterface* origin_port) { 740 // Remove any candidates whose generation is older than this one. The 741 // presence of a new generation indicates that the old ones are not useful. 742 uint32 i = 0; 743 while (i < remote_candidates_.size()) { 744 if (remote_candidates_[i].generation() < remote_candidate.generation()) { 745 LOG(INFO) << "Pruning candidate from old generation: " 746 << remote_candidates_[i].address().ToSensitiveString(); 747 remote_candidates_.erase(remote_candidates_.begin() + i); 748 } else { 749 i += 1; 750 } 751 } 752 753 // Make sure this candidate is not a duplicate. 754 for (uint32 i = 0; i < remote_candidates_.size(); ++i) { 755 if (remote_candidates_[i].IsEquivalent(remote_candidate)) { 756 LOG(INFO) << "Duplicate candidate: " 757 << remote_candidate.address().ToSensitiveString(); 758 return; 759 } 760 } 761 762 // Try this candidate for all future ports. 763 remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); 764 } 765 766 // Set options on ourselves is simply setting options on all of our available 767 // port objects. 768 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { 769 OptionMap::iterator it = options_.find(opt); 770 if (it == options_.end()) { 771 options_.insert(std::make_pair(opt, value)); 772 } else if (it->second == value) { 773 return 0; 774 } else { 775 it->second = value; 776 } 777 778 for (uint32 i = 0; i < ports_.size(); ++i) { 779 int val = ports_[i]->SetOption(opt, value); 780 if (val < 0) { 781 // Because this also occurs deferred, probably no point in reporting an 782 // error 783 LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " 784 << ports_[i]->GetError(); 785 } 786 } 787 return 0; 788 } 789 790 // Send data to the other side, using our best connection. 791 int P2PTransportChannel::SendPacket(const char *data, size_t len, 792 talk_base::DiffServCodePoint dscp, 793 int flags) { 794 ASSERT(worker_thread_ == talk_base::Thread::Current()); 795 if (flags != 0) { 796 error_ = EINVAL; 797 return -1; 798 } 799 if (best_connection_ == NULL) { 800 error_ = EWOULDBLOCK; 801 return -1; 802 } 803 804 int sent = best_connection_->Send(data, len, dscp); 805 if (sent <= 0) { 806 ASSERT(sent < 0); 807 error_ = best_connection_->GetError(); 808 } 809 return sent; 810 } 811 812 bool P2PTransportChannel::GetStats(ConnectionInfos *infos) { 813 ASSERT(worker_thread_ == talk_base::Thread::Current()); 814 // Gather connection infos. 815 infos->clear(); 816 817 std::vector<Connection *>::const_iterator it; 818 for (it = connections_.begin(); it != connections_.end(); ++it) { 819 Connection *connection = *it; 820 ConnectionInfo info; 821 info.best_connection = (best_connection_ == connection); 822 info.readable = 823 (connection->read_state() == Connection::STATE_READABLE); 824 info.writable = 825 (connection->write_state() == Connection::STATE_WRITABLE); 826 info.timeout = 827 (connection->write_state() == Connection::STATE_WRITE_TIMEOUT); 828 info.new_connection = !connection->reported(); 829 connection->set_reported(true); 830 info.rtt = connection->rtt(); 831 info.sent_total_bytes = connection->sent_total_bytes(); 832 info.sent_bytes_second = connection->sent_bytes_second(); 833 info.recv_total_bytes = connection->recv_total_bytes(); 834 info.recv_bytes_second = connection->recv_bytes_second(); 835 info.local_candidate = connection->local_candidate(); 836 info.remote_candidate = connection->remote_candidate(); 837 info.key = connection; 838 infos->push_back(info); 839 } 840 841 return true; 842 } 843 844 talk_base::DiffServCodePoint P2PTransportChannel::DefaultDscpValue() const { 845 OptionMap::const_iterator it = options_.find(talk_base::Socket::OPT_DSCP); 846 if (it == options_.end()) { 847 return talk_base::DSCP_NO_CHANGE; 848 } 849 return static_cast<talk_base::DiffServCodePoint> (it->second); 850 } 851 852 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending) 853 void P2PTransportChannel::Allocate() { 854 // Time for a new allocator, lets make sure we have a signalling channel 855 // to communicate candidates through first. 856 waiting_for_signaling_ = true; 857 SignalRequestSignaling(this); 858 } 859 860 // Monitor connection states. 861 void P2PTransportChannel::UpdateConnectionStates() { 862 uint32 now = talk_base::Time(); 863 864 // We need to copy the list of connections since some may delete themselves 865 // when we call UpdateState. 866 for (uint32 i = 0; i < connections_.size(); ++i) 867 connections_[i]->UpdateState(now); 868 } 869 870 // Prepare for best candidate sorting. 871 void P2PTransportChannel::RequestSort() { 872 if (!sort_dirty_) { 873 worker_thread_->Post(this, MSG_SORT); 874 sort_dirty_ = true; 875 } 876 } 877 878 // Sort the available connections to find the best one. We also monitor 879 // the number of available connections and the current state. 880 void P2PTransportChannel::SortConnections() { 881 ASSERT(worker_thread_ == talk_base::Thread::Current()); 882 883 // Make sure the connection states are up-to-date since this affects how they 884 // will be sorted. 885 UpdateConnectionStates(); 886 887 // Any changes after this point will require a re-sort. 888 sort_dirty_ = false; 889 890 // Get a list of the networks that we are using. 891 std::set<talk_base::Network*> networks; 892 for (uint32 i = 0; i < connections_.size(); ++i) 893 networks.insert(connections_[i]->port()->Network()); 894 895 // Find the best alternative connection by sorting. It is important to note 896 // that amongst equal preference, writable connections, this will choose the 897 // one whose estimated latency is lowest. So it is the only one that we 898 // need to consider switching to. 899 900 ConnectionCompare cmp; 901 std::stable_sort(connections_.begin(), connections_.end(), cmp); 902 LOG(LS_VERBOSE) << "Sorting available connections:"; 903 for (uint32 i = 0; i < connections_.size(); ++i) { 904 LOG(LS_VERBOSE) << connections_[i]->ToString(); 905 } 906 907 Connection* top_connection = NULL; 908 if (connections_.size() > 0) 909 top_connection = connections_[0]; 910 911 // We don't want to pick the best connections if channel is using RFC5245 912 // and it's mode is CONTROLLED, as connections will be selected by the 913 // CONTROLLING agent. 914 915 // If necessary, switch to the new choice. 916 if (protocol_type_ != ICEPROTO_RFC5245 || ice_role_ == ICEROLE_CONTROLLING) { 917 if (ShouldSwitch(best_connection_, top_connection)) 918 SwitchBestConnectionTo(top_connection); 919 } 920 921 // We can prune any connection for which there is a writable connection on 922 // the same network with better or equal priority. We leave those with 923 // better priority just in case they become writable later (at which point, 924 // we would prune out the current best connection). We leave connections on 925 // other networks because they may not be using the same resources and they 926 // may represent very distinct paths over which we can switch. 927 std::set<talk_base::Network*>::iterator network; 928 for (network = networks.begin(); network != networks.end(); ++network) { 929 Connection* primier = GetBestConnectionOnNetwork(*network); 930 if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) 931 continue; 932 933 for (uint32 i = 0; i < connections_.size(); ++i) { 934 if ((connections_[i] != primier) && 935 (connections_[i]->port()->Network() == *network) && 936 (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { 937 connections_[i]->Prune(); 938 } 939 } 940 } 941 942 // Check if all connections are timedout. 943 bool all_connections_timedout = true; 944 for (uint32 i = 0; i < connections_.size(); ++i) { 945 if (connections_[i]->write_state() != Connection::STATE_WRITE_TIMEOUT) { 946 all_connections_timedout = false; 947 break; 948 } 949 } 950 951 // Now update the writable state of the channel with the information we have 952 // so far. 953 if (best_connection_ && best_connection_->writable()) { 954 HandleWritable(); 955 } else if (all_connections_timedout) { 956 HandleAllTimedOut(); 957 } else { 958 HandleNotWritable(); 959 } 960 961 // Update the state of this channel. This method is called whenever the 962 // state of any connection changes, so this is a good place to do this. 963 UpdateChannelState(); 964 } 965 966 967 // Track the best connection, and let listeners know 968 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { 969 // Note: if conn is NULL, the previous best_connection_ has been destroyed, 970 // so don't use it. 971 Connection* old_best_connection = best_connection_; 972 best_connection_ = conn; 973 if (best_connection_) { 974 if (old_best_connection) { 975 LOG_J(LS_INFO, this) << "Previous best connection: " 976 << old_best_connection->ToString(); 977 } 978 LOG_J(LS_INFO, this) << "New best connection: " 979 << best_connection_->ToString(); 980 SignalRouteChange(this, best_connection_->remote_candidate()); 981 } else { 982 LOG_J(LS_INFO, this) << "No best connection"; 983 } 984 } 985 986 void P2PTransportChannel::UpdateChannelState() { 987 // The Handle* functions already set the writable state. We'll just double- 988 // check it here. 989 bool writable = ((best_connection_ != NULL) && 990 (best_connection_->write_state() == 991 Connection::STATE_WRITABLE)); 992 ASSERT(writable == this->writable()); 993 if (writable != this->writable()) 994 LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch"; 995 996 bool readable = false; 997 for (uint32 i = 0; i < connections_.size(); ++i) { 998 if (connections_[i]->read_state() == Connection::STATE_READABLE) 999 readable = true; 1000 } 1001 set_readable(readable); 1002 } 1003 1004 // We checked the status of our connections and we had at least one that 1005 // was writable, go into the writable state. 1006 void P2PTransportChannel::HandleWritable() { 1007 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1008 if (!writable()) { 1009 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { 1010 if (allocator_sessions_[i]->IsGettingPorts()) { 1011 allocator_sessions_[i]->StopGettingPorts(); 1012 } 1013 } 1014 } 1015 1016 was_writable_ = true; 1017 set_writable(true); 1018 } 1019 1020 // Notify upper layer about channel not writable state, if it was before. 1021 void P2PTransportChannel::HandleNotWritable() { 1022 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1023 if (was_writable_) { 1024 was_writable_ = false; 1025 set_writable(false); 1026 } 1027 } 1028 1029 void P2PTransportChannel::HandleAllTimedOut() { 1030 // Currently we are treating this as channel not writable. 1031 HandleNotWritable(); 1032 } 1033 1034 // If we have a best connection, return it, otherwise return top one in the 1035 // list (later we will mark it best). 1036 Connection* P2PTransportChannel::GetBestConnectionOnNetwork( 1037 talk_base::Network* network) { 1038 // If the best connection is on this network, then it wins. 1039 if (best_connection_ && (best_connection_->port()->Network() == network)) 1040 return best_connection_; 1041 1042 // Otherwise, we return the top-most in sorted order. 1043 for (uint32 i = 0; i < connections_.size(); ++i) { 1044 if (connections_[i]->port()->Network() == network) 1045 return connections_[i]; 1046 } 1047 1048 return NULL; 1049 } 1050 1051 // Handle any queued up requests 1052 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) { 1053 switch (pmsg->message_id) { 1054 case MSG_SORT: 1055 OnSort(); 1056 break; 1057 case MSG_PING: 1058 OnPing(); 1059 break; 1060 default: 1061 ASSERT(false); 1062 break; 1063 } 1064 } 1065 1066 // Handle queued up sort request 1067 void P2PTransportChannel::OnSort() { 1068 // Resort the connections based on the new statistics. 1069 SortConnections(); 1070 } 1071 1072 // Handle queued up ping request 1073 void P2PTransportChannel::OnPing() { 1074 // Make sure the states of the connections are up-to-date (since this affects 1075 // which ones are pingable). 1076 UpdateConnectionStates(); 1077 1078 // Find the oldest pingable connection and have it do a ping. 1079 Connection* conn = FindNextPingableConnection(); 1080 if (conn) 1081 PingConnection(conn); 1082 1083 // Post ourselves a message to perform the next ping. 1084 uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY; 1085 thread()->PostDelayed(delay, this, MSG_PING); 1086 } 1087 1088 // Is the connection in a state for us to even consider pinging the other side? 1089 bool P2PTransportChannel::IsPingable(Connection* conn) { 1090 // An unconnected connection cannot be written to at all, so pinging is out 1091 // of the question. 1092 if (!conn->connected()) 1093 return false; 1094 1095 if (writable()) { 1096 // If we are writable, then we only want to ping connections that could be 1097 // better than this one, i.e., the ones that were not pruned. 1098 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); 1099 } else { 1100 // If we are not writable, then we need to try everything that might work. 1101 // This includes both connections that do not have write timeout as well as 1102 // ones that do not have read timeout. A connection could be readable but 1103 // be in write-timeout if we pruned it before. Since the other side is 1104 // still pinging it, it very well might still work. 1105 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || 1106 (conn->read_state() != Connection::STATE_READ_TIMEOUT); 1107 } 1108 } 1109 1110 // Returns the next pingable connection to ping. This will be the oldest 1111 // pingable connection unless we have a writable connection that is past the 1112 // maximum acceptable ping delay. 1113 Connection* P2PTransportChannel::FindNextPingableConnection() { 1114 uint32 now = talk_base::Time(); 1115 if (best_connection_ && 1116 (best_connection_->write_state() == Connection::STATE_WRITABLE) && 1117 (best_connection_->last_ping_sent() 1118 + MAX_CURRENT_WRITABLE_DELAY <= now)) { 1119 return best_connection_; 1120 } 1121 1122 Connection* oldest_conn = NULL; 1123 uint32 oldest_time = 0xFFFFFFFF; 1124 for (uint32 i = 0; i < connections_.size(); ++i) { 1125 if (IsPingable(connections_[i])) { 1126 if (connections_[i]->last_ping_sent() < oldest_time) { 1127 oldest_time = connections_[i]->last_ping_sent(); 1128 oldest_conn = connections_[i]; 1129 } 1130 } 1131 } 1132 return oldest_conn; 1133 } 1134 1135 // Apart from sending ping from |conn| this method also updates 1136 // |use_candidate_attr| flag. The criteria to update this flag is 1137 // explained below. 1138 // Set USE-CANDIDATE if doing ICE AND this channel is in CONTROLLING AND 1139 // a) Channel is in FULL ICE AND 1140 // a.1) |conn| is the best connection OR 1141 // a.2) there is no best connection OR 1142 // a.3) the best connection is unwritable OR 1143 // a.4) |conn| has higher priority than best_connection. 1144 // b) we're doing LITE ICE AND 1145 // b.1) |conn| is the best_connection AND 1146 // b.2) |conn| is writable. 1147 void P2PTransportChannel::PingConnection(Connection* conn) { 1148 bool use_candidate = false; 1149 if (protocol_type_ == ICEPROTO_RFC5245) { 1150 if (remote_ice_mode_ == ICEMODE_FULL && ice_role_ == ICEROLE_CONTROLLING) { 1151 use_candidate = (conn == best_connection_) || 1152 (best_connection_ == NULL) || 1153 (!best_connection_->writable()) || 1154 (conn->priority() > best_connection_->priority()); 1155 } else if (remote_ice_mode_ == ICEMODE_LITE && conn == best_connection_) { 1156 use_candidate = best_connection_->writable(); 1157 } 1158 } 1159 conn->set_use_candidate_attr(use_candidate); 1160 conn->Ping(talk_base::Time()); 1161 } 1162 1163 // When a connection's state changes, we need to figure out who to use as 1164 // the best connection again. It could have become usable, or become unusable. 1165 void P2PTransportChannel::OnConnectionStateChange(Connection* connection) { 1166 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1167 1168 // Update the best connection if the state change is from pending best 1169 // connection and role is controlled. 1170 if (protocol_type_ == ICEPROTO_RFC5245 && ice_role_ == ICEROLE_CONTROLLED) { 1171 if (connection == pending_best_connection_ && connection->writable()) { 1172 pending_best_connection_ = NULL; 1173 SwitchBestConnectionTo(connection); 1174 } 1175 } 1176 1177 // We have to unroll the stack before doing this because we may be changing 1178 // the state of connections while sorting. 1179 RequestSort(); 1180 } 1181 1182 // When a connection is removed, edit it out, and then update our best 1183 // connection. 1184 void P2PTransportChannel::OnConnectionDestroyed(Connection* connection) { 1185 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1186 1187 // Note: the previous best_connection_ may be destroyed by now, so don't 1188 // use it. 1189 1190 // Remove this connection from the list. 1191 std::vector<Connection*>::iterator iter = 1192 std::find(connections_.begin(), connections_.end(), connection); 1193 ASSERT(iter != connections_.end()); 1194 connections_.erase(iter); 1195 1196 LOG_J(LS_INFO, this) << "Removed connection (" 1197 << static_cast<int>(connections_.size()) << " remaining)"; 1198 1199 if (pending_best_connection_ == connection) { 1200 pending_best_connection_ = NULL; 1201 } 1202 1203 // If this is currently the best connection, then we need to pick a new one. 1204 // The call to SortConnections will pick a new one. It looks at the current 1205 // best connection in order to avoid switching between fairly similar ones. 1206 // Since this connection is no longer an option, we can just set best to NULL 1207 // and re-choose a best assuming that there was no best connection. 1208 if (best_connection_ == connection) { 1209 SwitchBestConnectionTo(NULL); 1210 RequestSort(); 1211 } 1212 } 1213 1214 // When a port is destroyed remove it from our list of ports to use for 1215 // connection attempts. 1216 void P2PTransportChannel::OnPortDestroyed(PortInterface* port) { 1217 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1218 1219 // Remove this port from the list (if we didn't drop it already). 1220 std::vector<PortInterface*>::iterator iter = 1221 std::find(ports_.begin(), ports_.end(), port); 1222 if (iter != ports_.end()) 1223 ports_.erase(iter); 1224 1225 LOG(INFO) << "Removed port from p2p socket: " 1226 << static_cast<int>(ports_.size()) << " remaining"; 1227 } 1228 1229 // We data is available, let listeners know 1230 void P2PTransportChannel::OnReadPacket( 1231 Connection *connection, const char *data, size_t len, 1232 const talk_base::PacketTime& packet_time) { 1233 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1234 1235 // Do not deliver, if packet doesn't belong to the correct transport channel. 1236 if (!FindConnection(connection)) 1237 return; 1238 1239 // Let the client know of an incoming packet 1240 SignalReadPacket(this, data, len, packet_time, 0); 1241 } 1242 1243 void P2PTransportChannel::OnReadyToSend(Connection* connection) { 1244 if (connection == best_connection_ && writable()) { 1245 SignalReadyToSend(this); 1246 } 1247 } 1248 1249 } // namespace cricket 1250