1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/p2ptransportchannel.h" 29 30 #include <set> 31 #include "talk/base/common.h" 32 #include "talk/base/logging.h" 33 #include "talk/p2p/base/common.h" 34 35 namespace { 36 37 // messages for queuing up work for ourselves 38 const uint32 MSG_SORT = 1; 39 const uint32 MSG_PING = 2; 40 const uint32 MSG_ALLOCATE = 3; 41 42 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers) 43 // for pinging. When the socket is writable, we will use only 1 Kbps because 44 // we don't want to degrade the quality on a modem. These numbers should work 45 // well on a 28.8K modem, which is the slowest connection on which the voice 46 // quality is reasonable at all. 47 static const uint32 PING_PACKET_SIZE = 60 * 8; 48 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms 49 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms 50 51 // If there is a current writable connection, then we will also try hard to 52 // make sure it is pinged at this rate. 53 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit 54 55 // The minimum improvement in RTT that justifies a switch. 56 static const double kMinImprovement = 10; 57 58 // Amount of time that we wait when *losing* writability before we try doing 59 // another allocation. 60 static const int kAllocateDelay = 1 * 1000; // 1 second 61 62 // We will try creating a new allocator from scratch after a delay of this 63 // length without becoming writable (or timing out). 64 static const int kAllocatePeriod = 20 * 1000; // 20 seconds 65 66 cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port, 67 cricket::Port* origin_port) { 68 if (!origin_port) 69 return cricket::Port::ORIGIN_MESSAGE; 70 else if (port == origin_port) 71 return cricket::Port::ORIGIN_THIS_PORT; 72 else 73 return cricket::Port::ORIGIN_OTHER_PORT; 74 } 75 76 // Compares two connections based only on static information about them. 77 int CompareConnectionCandidates(cricket::Connection* a, 78 cricket::Connection* b) { 79 // Combine local and remote preferences 80 ASSERT(a->local_candidate().preference() == a->port()->preference()); 81 ASSERT(b->local_candidate().preference() == b->port()->preference()); 82 double a_pref = a->local_candidate().preference() 83 * a->remote_candidate().preference(); 84 double b_pref = b->local_candidate().preference() 85 * b->remote_candidate().preference(); 86 87 // Now check combined preferences. Lower values get sorted last. 88 if (a_pref > b_pref) 89 return 1; 90 if (a_pref < b_pref) 91 return -1; 92 93 return 0; 94 } 95 96 // Compare two connections based on their writability and static preferences. 97 int CompareConnections(cricket::Connection *a, cricket::Connection *b) { 98 // Sort based on write-state. Better states have lower values. 99 if (a->write_state() < b->write_state()) 100 return 1; 101 if (a->write_state() > b->write_state()) 102 return -1; 103 104 // Compare the candidate information. 105 return CompareConnectionCandidates(a, b); 106 } 107 108 // Wraps the comparison connection into a less than operator that puts higher 109 // priority writable connections first. 110 class ConnectionCompare { 111 public: 112 bool operator()(const cricket::Connection *ca, 113 const cricket::Connection *cb) { 114 cricket::Connection* a = const_cast<cricket::Connection*>(ca); 115 cricket::Connection* b = const_cast<cricket::Connection*>(cb); 116 117 // Compare first on writability and static preferences. 118 int cmp = CompareConnections(a, b); 119 if (cmp > 0) 120 return true; 121 if (cmp < 0) 122 return false; 123 124 // Otherwise, sort based on latency estimate. 125 return a->rtt() < b->rtt(); 126 127 // Should we bother checking for the last connection that last received 128 // data? It would help rendezvous on the connection that is also receiving 129 // packets. 130 // 131 // TODO: Yes we should definitely do this. The TCP protocol gains 132 // efficiency by being used bidirectionally, as opposed to two separate 133 // unidirectional streams. This test should probably occur before 134 // comparison of local prefs (assuming combined prefs are the same). We 135 // need to be careful though, not to bounce back and forth with both sides 136 // trying to rendevous with the other. 137 } 138 }; 139 140 // Determines whether we should switch between two connections, based first on 141 // static preferences and then (if those are equal) on latency estimates. 142 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { 143 if (a_conn == b_conn) 144 return false; 145 146 if (!a_conn || !b_conn) // don't think the latter should happen 147 return true; 148 149 int prefs_cmp = CompareConnections(a_conn, b_conn); 150 if (prefs_cmp < 0) 151 return true; 152 if (prefs_cmp > 0) 153 return false; 154 155 return b_conn->rtt() <= a_conn->rtt() + kMinImprovement; 156 } 157 158 } // unnamed namespace 159 160 namespace cricket { 161 162 P2PTransportChannel::P2PTransportChannel(const std::string &name, 163 const std::string &content_type, 164 P2PTransport* transport, 165 PortAllocator *allocator) : 166 TransportChannelImpl(name, content_type), 167 transport_(transport), 168 allocator_(allocator), 169 worker_thread_(talk_base::Thread::Current()), 170 waiting_for_signaling_(false), 171 error_(0), 172 best_connection_(NULL), 173 pinging_started_(false), 174 sort_dirty_(false), 175 was_writable_(false), 176 was_timed_out_(true) { 177 } 178 179 P2PTransportChannel::~P2PTransportChannel() { 180 ASSERT(worker_thread_ == talk_base::Thread::Current()); 181 182 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 183 delete allocator_sessions_[i]; 184 } 185 186 // Add the allocator session to our list so that we know which sessions 187 // are still active. 188 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { 189 session->set_generation(static_cast<uint32>(allocator_sessions_.size())); 190 allocator_sessions_.push_back(session); 191 192 // We now only want to apply new candidates that we receive to the ports 193 // created by this new session because these are replacing those of the 194 // previous sessions. 195 ports_.clear(); 196 197 session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); 198 session->SignalCandidatesReady.connect( 199 this, &P2PTransportChannel::OnCandidatesReady); 200 session->GetInitialPorts(); 201 if (pinging_started_) 202 session->StartGetAllPorts(); 203 } 204 205 // Go into the state of processing candidates, and running in general 206 void P2PTransportChannel::Connect() { 207 ASSERT(worker_thread_ == talk_base::Thread::Current()); 208 209 // Kick off an allocator session 210 Allocate(); 211 212 // Start pinging as the ports come in. 213 thread()->Post(this, MSG_PING); 214 } 215 216 // Reset the socket, clear up any previous allocations and start over 217 void P2PTransportChannel::Reset() { 218 ASSERT(worker_thread_ == talk_base::Thread::Current()); 219 220 // Get rid of all the old allocators. This should clean up everything. 221 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 222 delete allocator_sessions_[i]; 223 224 allocator_sessions_.clear(); 225 ports_.clear(); 226 connections_.clear(); 227 best_connection_ = NULL; 228 229 // Forget about all of the candidates we got before. 230 remote_candidates_.clear(); 231 232 // Revert to the initial state. 233 set_readable(false); 234 set_writable(false); 235 236 // Reinitialize the rest of our state. 237 waiting_for_signaling_ = false; 238 pinging_started_ = false; 239 sort_dirty_ = false; 240 was_writable_ = false; 241 was_timed_out_ = true; 242 243 // If we allocated before, start a new one now. 244 if (transport_->connect_requested()) 245 Allocate(); 246 247 // Start pinging as the ports come in. 248 thread()->Clear(this); 249 thread()->Post(this, MSG_PING); 250 } 251 252 // A new port is available, attempt to make connections for it 253 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session, 254 Port* port) { 255 ASSERT(worker_thread_ == talk_base::Thread::Current()); 256 257 // Set in-effect options on the new port 258 for (OptionMap::const_iterator it = options_.begin(); 259 it != options_.end(); 260 ++it) { 261 int val = port->SetOption(it->first, it->second); 262 if (val < 0) { 263 LOG_J(LS_WARNING, port) << "SetOption(" << it->first 264 << ", " << it->second 265 << ") failed: " << port->GetError(); 266 } 267 } 268 269 // Remember the ports and candidates, and signal that candidates are ready. 270 // The session will handle this, and send an initiate/accept/modify message 271 // if one is pending. 272 273 ports_.push_back(port); 274 port->SignalUnknownAddress.connect( 275 this, &P2PTransportChannel::OnUnknownAddress); 276 port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed); 277 278 // Attempt to create a connection from this new port to all of the remote 279 // candidates that we were given so far. 280 281 std::vector<RemoteCandidate>::iterator iter; 282 for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); 283 ++iter) 284 CreateConnection(port, *iter, iter->origin_port(), false); 285 286 SortConnections(); 287 } 288 289 // A new candidate is available, let listeners know 290 void P2PTransportChannel::OnCandidatesReady( 291 PortAllocatorSession *session, const std::vector<Candidate>& candidates) { 292 for (size_t i = 0; i < candidates.size(); ++i) { 293 SignalCandidateReady(this, candidates[i]); 294 } 295 } 296 297 // Handle stun packets 298 void P2PTransportChannel::OnUnknownAddress( 299 Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg, 300 const std::string &remote_username) { 301 ASSERT(worker_thread_ == talk_base::Thread::Current()); 302 303 // Port has received a valid stun packet from an address that no Connection 304 // is currently available for. See if the remote user name is in the remote 305 // candidate list. If it isn't return error to the stun request. 306 307 const Candidate *candidate = NULL; 308 std::vector<RemoteCandidate>::iterator it; 309 for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { 310 if ((*it).username() == remote_username) { 311 candidate = &(*it); 312 break; 313 } 314 } 315 if (candidate == NULL) { 316 // Don't know about this username, the request is bogus 317 // This sometimes happens if a binding response comes in before the ACCEPT 318 // message. It is totally valid; the retry state machine will try again. 319 320 port->SendBindingErrorResponse(stun_msg, address, 321 STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); 322 delete stun_msg; 323 return; 324 } 325 326 // Check for connectivity to this address. Create connections 327 // to this address across all local ports. First, add this as a new remote 328 // address 329 330 Candidate new_remote_candidate = *candidate; 331 new_remote_candidate.set_address(address); 332 // new_remote_candidate.set_protocol(port->protocol()); 333 334 // This remote username exists. Now create connections using this candidate, 335 // and resort 336 337 if (CreateConnections(new_remote_candidate, port, true)) { 338 // Send the pinger a successful stun response. 339 port->SendBindingResponse(stun_msg, address); 340 341 // Update the list of connections since we just added another. We do this 342 // after sending the response since it could (in principle) delete the 343 // connection in question. 344 SortConnections(); 345 } else { 346 // Hopefully this won't occur, because changing a destination address 347 // shouldn't cause a new connection to fail 348 ASSERT(false); 349 port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, 350 STUN_ERROR_REASON_SERVER_ERROR); 351 } 352 353 delete stun_msg; 354 } 355 356 void P2PTransportChannel::OnCandidate(const Candidate& candidate) { 357 ASSERT(worker_thread_ == talk_base::Thread::Current()); 358 359 // Create connections to this remote candidate. 360 CreateConnections(candidate, NULL, false); 361 362 // Resort the connections list, which may have new elements. 363 SortConnections(); 364 } 365 366 // Creates connections from all of the ports that we care about to the given 367 // remote candidate. The return value is true if we created a connection from 368 // the origin port. 369 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate, 370 Port* origin_port, 371 bool readable) { 372 ASSERT(worker_thread_ == talk_base::Thread::Current()); 373 374 // Add a new connection for this candidate to every port that allows such a 375 // connection (i.e., if they have compatible protocols) and that does not 376 // already have a connection to an equivalent candidate. We must be careful 377 // to make sure that the origin port is included, even if it was pruned, 378 // since that may be the only port that can create this connection. 379 380 bool created = false; 381 382 std::vector<Port *>::reverse_iterator it; 383 for (it = ports_.rbegin(); it != ports_.rend(); ++it) { 384 if (CreateConnection(*it, remote_candidate, origin_port, readable)) { 385 if (*it == origin_port) 386 created = true; 387 } 388 } 389 390 if ((origin_port != NULL) && 391 std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { 392 if (CreateConnection(origin_port, remote_candidate, origin_port, readable)) 393 created = true; 394 } 395 396 // Remember this remote candidate so that we can add it to future ports. 397 RememberRemoteCandidate(remote_candidate, origin_port); 398 399 return created; 400 } 401 402 // Setup a connection object for the local and remote candidate combination. 403 // And then listen to connection object for changes. 404 bool P2PTransportChannel::CreateConnection(Port* port, 405 const Candidate& remote_candidate, 406 Port* origin_port, 407 bool readable) { 408 // Look for an existing connection with this remote address. If one is not 409 // found, then we can create a new connection for this address. 410 Connection* connection = port->GetConnection(remote_candidate.address()); 411 if (connection != NULL) { 412 // It is not legal to try to change any of the parameters of an existing 413 // connection; however, the other side can send a duplicate candidate. 414 if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { 415 LOG(INFO) << "Attempt to change a remote candidate"; 416 return false; 417 } 418 } else { 419 Port::CandidateOrigin origin = GetOrigin(port, origin_port); 420 connection = port->CreateConnection(remote_candidate, origin); 421 if (!connection) 422 return false; 423 424 connections_.push_back(connection); 425 connection->SignalReadPacket.connect( 426 this, &P2PTransportChannel::OnReadPacket); 427 connection->SignalStateChange.connect( 428 this, &P2PTransportChannel::OnConnectionStateChange); 429 connection->SignalDestroyed.connect( 430 this, &P2PTransportChannel::OnConnectionDestroyed); 431 432 LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", (" 433 << connections_.size() << " total)"; 434 } 435 436 // If we are readable, it is because we are creating this in response to a 437 // ping from the other side. This will cause the state to become readable. 438 if (readable) 439 connection->ReceivedPing(); 440 441 return true; 442 } 443 444 // Maintain our remote candidate list, adding this new remote one. 445 void P2PTransportChannel::RememberRemoteCandidate( 446 const Candidate& remote_candidate, Port* origin_port) { 447 // Remove any candidates whose generation is older than this one. The 448 // presence of a new generation indicates that the old ones are not useful. 449 uint32 i = 0; 450 while (i < remote_candidates_.size()) { 451 if (remote_candidates_[i].generation() < remote_candidate.generation()) { 452 LOG(INFO) << "Pruning candidate from old generation: " 453 << remote_candidates_[i].address().ToString(); 454 remote_candidates_.erase(remote_candidates_.begin() + i); 455 } else { 456 i += 1; 457 } 458 } 459 460 // Make sure this candidate is not a duplicate. 461 for (uint32 i = 0; i < remote_candidates_.size(); ++i) { 462 if (remote_candidates_[i].IsEquivalent(remote_candidate)) { 463 LOG(INFO) << "Duplicate candidate: " 464 << remote_candidate.address().ToString(); 465 return; 466 } 467 } 468 469 // Try this candidate for all future ports. 470 remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); 471 472 // We have some candidates from the other side, we are now serious about 473 // this connection. Let's do the StartGetAllPorts thing. 474 if (!pinging_started_) { 475 pinging_started_ = true; 476 for (size_t i = 0; i < allocator_sessions_.size(); ++i) { 477 if (!allocator_sessions_[i]->IsGettingAllPorts()) 478 allocator_sessions_[i]->StartGetAllPorts(); 479 } 480 } 481 } 482 483 // Send data to the other side, using our best connection 484 int P2PTransportChannel::SendPacket(const char *data, size_t len) { 485 // This can get called on any thread that is convenient to write from! 486 if (best_connection_ == NULL) { 487 error_ = EWOULDBLOCK; 488 return SOCKET_ERROR; 489 } 490 int sent = best_connection_->Send(data, len); 491 if (sent <= 0) { 492 ASSERT(sent < 0); 493 error_ = best_connection_->GetError(); 494 } 495 return sent; 496 } 497 498 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending) 499 void P2PTransportChannel::Allocate() { 500 CancelPendingAllocate(); 501 // Time for a new allocator, lets make sure we have a signalling channel 502 // to communicate candidates through first. 503 waiting_for_signaling_ = true; 504 SignalRequestSignaling(); 505 } 506 507 // Cancels the pending allocate, if any. 508 void P2PTransportChannel::CancelPendingAllocate() { 509 thread()->Clear(this, MSG_ALLOCATE); 510 } 511 512 // Monitor connection states 513 void P2PTransportChannel::UpdateConnectionStates() { 514 uint32 now = talk_base::Time(); 515 516 // We need to copy the list of connections since some may delete themselves 517 // when we call UpdateState. 518 for (uint32 i = 0; i < connections_.size(); ++i) 519 connections_[i]->UpdateState(now); 520 } 521 522 // Prepare for best candidate sorting 523 void P2PTransportChannel::RequestSort() { 524 if (!sort_dirty_) { 525 worker_thread_->Post(this, MSG_SORT); 526 sort_dirty_ = true; 527 } 528 } 529 530 // Sort the available connections to find the best one. We also monitor 531 // the number of available connections and the current state so that we 532 // can possibly kick off more allocators (for more connections). 533 void P2PTransportChannel::SortConnections() { 534 ASSERT(worker_thread_ == talk_base::Thread::Current()); 535 536 // Make sure the connection states are up-to-date since this affects how they 537 // will be sorted. 538 UpdateConnectionStates(); 539 540 // Any changes after this point will require a re-sort. 541 sort_dirty_ = false; 542 543 // Get a list of the networks that we are using. 544 std::set<talk_base::Network*> networks; 545 for (uint32 i = 0; i < connections_.size(); ++i) 546 networks.insert(connections_[i]->port()->network()); 547 548 // Find the best alternative connection by sorting. It is important to note 549 // that amongst equal preference, writable connections, this will choose the 550 // one whose estimated latency is lowest. So it is the only one that we 551 // need to consider switching to. 552 553 ConnectionCompare cmp; 554 std::stable_sort(connections_.begin(), connections_.end(), cmp); 555 Connection* top_connection = NULL; 556 if (connections_.size() > 0) 557 top_connection = connections_[0]; 558 559 // If necessary, switch to the new choice. 560 if (ShouldSwitch(best_connection_, top_connection)) 561 SwitchBestConnectionTo(top_connection); 562 563 // We can prune any connection for which there is a writable connection on 564 // the same network with better or equal prefences. We leave those with 565 // better preference just in case they become writable later (at which point, 566 // we would prune out the current best connection). We leave connections on 567 // other networks because they may not be using the same resources and they 568 // may represent very distinct paths over which we can switch. 569 std::set<talk_base::Network*>::iterator network; 570 for (network = networks.begin(); network != networks.end(); ++network) { 571 Connection* primier = GetBestConnectionOnNetwork(*network); 572 if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) 573 continue; 574 575 for (uint32 i = 0; i < connections_.size(); ++i) { 576 if ((connections_[i] != primier) && 577 (connections_[i]->port()->network() == *network) && 578 (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { 579 connections_[i]->Prune(); 580 } 581 } 582 } 583 584 // Count the number of connections in the various states. 585 586 int writable = 0; 587 int write_connect = 0; 588 int write_timeout = 0; 589 590 for (uint32 i = 0; i < connections_.size(); ++i) { 591 switch (connections_[i]->write_state()) { 592 case Connection::STATE_WRITABLE: 593 ++writable; 594 break; 595 case Connection::STATE_WRITE_CONNECT: 596 ++write_connect; 597 break; 598 case Connection::STATE_WRITE_TIMEOUT: 599 ++write_timeout; 600 break; 601 default: 602 ASSERT(false); 603 } 604 } 605 606 if (writable > 0) { 607 HandleWritable(); 608 } else if (write_connect > 0) { 609 HandleNotWritable(); 610 } else { 611 HandleAllTimedOut(); 612 } 613 614 // Update the state of this channel. This method is called whenever the 615 // state of any connection changes, so this is a good place to do this. 616 UpdateChannelState(); 617 618 // Notify of connection state change 619 SignalConnectionMonitor(this); 620 } 621 622 // Track the best connection, and let listeners know 623 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { 624 // Note: if conn is NULL, the previous best_connection_ has been destroyed, 625 // so don't use it. 626 // use it. 627 Connection* old_best_connection = best_connection_; 628 best_connection_ = conn; 629 if (best_connection_) { 630 if (old_best_connection) { 631 LOG_J(LS_INFO, this) << "Previous best connection: " 632 << old_best_connection->ToString(); 633 } 634 LOG_J(LS_INFO, this) << "New best connection: " 635 << best_connection_->ToString(); 636 SignalRouteChange(this, best_connection_->remote_candidate().address()); 637 } else { 638 LOG_J(LS_INFO, this) << "No best connection"; 639 } 640 } 641 642 void P2PTransportChannel::UpdateChannelState() { 643 // The Handle* functions already set the writable state. We'll just double- 644 // check it here. 645 bool writable = ((best_connection_ != NULL) && 646 (best_connection_->write_state() == 647 Connection::STATE_WRITABLE)); 648 ASSERT(writable == this->writable()); 649 if (writable != this->writable()) 650 LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch"; 651 652 bool readable = false; 653 for (uint32 i = 0; i < connections_.size(); ++i) { 654 if (connections_[i]->read_state() == Connection::STATE_READABLE) 655 readable = true; 656 } 657 set_readable(readable); 658 } 659 660 // We checked the status of our connections and we had at least one that 661 // was writable, go into the writable state. 662 void P2PTransportChannel::HandleWritable() { 663 // 664 // One or more connections writable! 665 // 666 if (!writable()) { 667 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { 668 if (allocator_sessions_[i]->IsGettingAllPorts()) { 669 allocator_sessions_[i]->StopGetAllPorts(); 670 } 671 } 672 673 // Stop further allocations. 674 CancelPendingAllocate(); 675 } 676 677 // We're writable, obviously we aren't timed out 678 was_writable_ = true; 679 was_timed_out_ = false; 680 set_writable(true); 681 } 682 683 // We checked the status of our connections and we didn't have any that 684 // were writable, go into the connecting state (kick off a new allocator 685 // session). 686 void P2PTransportChannel::HandleNotWritable() { 687 // 688 // No connections are writable but not timed out! 689 // 690 if (was_writable_) { 691 // If we were writable, let's kick off an allocator session immediately 692 was_writable_ = false; 693 Allocate(); 694 } 695 696 // We were connecting, obviously not ALL timed out. 697 was_timed_out_ = false; 698 set_writable(false); 699 } 700 701 // We checked the status of our connections and not only weren't they writable 702 // but they were also timed out, we really need a new allocator. 703 void P2PTransportChannel::HandleAllTimedOut() { 704 // 705 // No connections... all are timed out! 706 // 707 if (!was_timed_out_) { 708 // We weren't timed out before, so kick off an allocator now (we'll still 709 // be in the fully timed out state until the allocator actually gives back 710 // new ports) 711 Allocate(); 712 } 713 714 // NOTE: we start was_timed_out_ in the true state so that we don't get 715 // another allocator created WHILE we are in the process of building up 716 // our first allocator. 717 was_timed_out_ = true; 718 was_writable_ = false; 719 set_writable(false); 720 } 721 722 // If we have a best connection, return it, otherwise return top one in the 723 // list (later we will mark it best). 724 Connection* P2PTransportChannel::GetBestConnectionOnNetwork( 725 talk_base::Network* network) { 726 // If the best connection is on this network, then it wins. 727 if (best_connection_ && (best_connection_->port()->network() == network)) 728 return best_connection_; 729 730 // Otherwise, we return the top-most in sorted order. 731 for (uint32 i = 0; i < connections_.size(); ++i) { 732 if (connections_[i]->port()->network() == network) 733 return connections_[i]; 734 } 735 736 return NULL; 737 } 738 739 // Handle any queued up requests 740 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) { 741 if (pmsg->message_id == MSG_SORT) 742 OnSort(); 743 else if (pmsg->message_id == MSG_PING) 744 OnPing(); 745 else if (pmsg->message_id == MSG_ALLOCATE) 746 Allocate(); 747 else 748 ASSERT(false); 749 } 750 751 // Handle queued up sort request 752 void P2PTransportChannel::OnSort() { 753 // Resort the connections based on the new statistics. 754 SortConnections(); 755 } 756 757 // Handle queued up ping request 758 void P2PTransportChannel::OnPing() { 759 // Make sure the states of the connections are up-to-date (since this affects 760 // which ones are pingable). 761 UpdateConnectionStates(); 762 763 // Find the oldest pingable connection and have it do a ping. 764 Connection* conn = FindNextPingableConnection(); 765 if (conn) 766 conn->Ping(talk_base::Time()); 767 768 // Post ourselves a message to perform the next ping. 769 uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY; 770 thread()->PostDelayed(delay, this, MSG_PING); 771 } 772 773 // Is the connection in a state for us to even consider pinging the other side? 774 bool P2PTransportChannel::IsPingable(Connection* conn) { 775 // An unconnected connection cannot be written to at all, so pinging is out 776 // of the question. 777 if (!conn->connected()) 778 return false; 779 780 if (writable()) { 781 // If we are writable, then we only want to ping connections that could be 782 // better than this one, i.e., the ones that were not pruned. 783 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); 784 } else { 785 // If we are not writable, then we need to try everything that might work. 786 // This includes both connections that do not have write timeout as well as 787 // ones that do not have read timeout. A connection could be readable but 788 // be in write-timeout if we pruned it before. Since the other side is 789 // still pinging it, it very well might still work. 790 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || 791 (conn->read_state() != Connection::STATE_READ_TIMEOUT); 792 } 793 } 794 795 // Returns the next pingable connection to ping. This will be the oldest 796 // pingable connection unless we have a writable connection that is past the 797 // maximum acceptable ping delay. 798 Connection* P2PTransportChannel::FindNextPingableConnection() { 799 uint32 now = talk_base::Time(); 800 if (best_connection_ && 801 (best_connection_->write_state() == Connection::STATE_WRITABLE) && 802 (best_connection_->last_ping_sent() 803 + MAX_CURRENT_WRITABLE_DELAY <= now)) { 804 return best_connection_; 805 } 806 807 Connection* oldest_conn = NULL; 808 uint32 oldest_time = 0xFFFFFFFF; 809 for (uint32 i = 0; i < connections_.size(); ++i) { 810 if (IsPingable(connections_[i])) { 811 if (connections_[i]->last_ping_sent() < oldest_time) { 812 oldest_time = connections_[i]->last_ping_sent(); 813 oldest_conn = connections_[i]; 814 } 815 } 816 } 817 return oldest_conn; 818 } 819 820 // return the number of "pingable" connections 821 uint32 P2PTransportChannel::NumPingableConnections() { 822 uint32 count = 0; 823 for (uint32 i = 0; i < connections_.size(); ++i) { 824 if (IsPingable(connections_[i])) 825 count += 1; 826 } 827 return count; 828 } 829 830 // When a connection's state changes, we need to figure out who to use as 831 // the best connection again. It could have become usable, or become unusable. 832 void P2PTransportChannel::OnConnectionStateChange(Connection *connection) { 833 ASSERT(worker_thread_ == talk_base::Thread::Current()); 834 835 // We have to unroll the stack before doing this because we may be changing 836 // the state of connections while sorting. 837 RequestSort(); 838 } 839 840 // When a connection is removed, edit it out, and then update our best 841 // connection. 842 void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) { 843 ASSERT(worker_thread_ == talk_base::Thread::Current()); 844 845 // Note: the previous best_connection_ may be destroyed by now, so don't 846 // use it. 847 848 // Remove this connection from the list. 849 std::vector<Connection*>::iterator iter = 850 std::find(connections_.begin(), connections_.end(), connection); 851 ASSERT(iter != connections_.end()); 852 connections_.erase(iter); 853 854 LOG_J(LS_INFO, this) << "Removed connection (" 855 << static_cast<int>(connections_.size()) << " remaining)"; 856 857 // If this is currently the best connection, then we need to pick a new one. 858 // The call to SortConnections will pick a new one. It looks at the current 859 // best connection in order to avoid switching between fairly similar ones. 860 // Since this connection is no longer an option, we can just set best to NULL 861 // and re-choose a best assuming that there was no best connection. 862 if (best_connection_ == connection) { 863 SwitchBestConnectionTo(NULL); 864 RequestSort(); 865 } 866 } 867 868 // When a port is destroyed remove it from our list of ports to use for 869 // connection attempts. 870 void P2PTransportChannel::OnPortDestroyed(Port* port) { 871 ASSERT(worker_thread_ == talk_base::Thread::Current()); 872 873 // Remove this port from the list (if we didn't drop it already). 874 std::vector<Port*>::iterator iter = 875 std::find(ports_.begin(), ports_.end(), port); 876 if (iter != ports_.end()) 877 ports_.erase(iter); 878 879 LOG(INFO) << "Removed port from p2p socket: " 880 << static_cast<int>(ports_.size()) << " remaining"; 881 } 882 883 // We data is available, let listeners know 884 void P2PTransportChannel::OnReadPacket(Connection *connection, 885 const char *data, size_t len) { 886 ASSERT(worker_thread_ == talk_base::Thread::Current()); 887 888 // Let the client know of an incoming packet 889 890 SignalReadPacket(this, data, len); 891 } 892 893 // Set options on ourselves is simply setting options on all of our available 894 // port objects. 895 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { 896 OptionMap::iterator it = options_.find(opt); 897 if (it == options_.end()) { 898 options_.insert(std::make_pair(opt, value)); 899 } else if (it->second == value) { 900 return 0; 901 } else { 902 it->second = value; 903 } 904 905 for (uint32 i = 0; i < ports_.size(); ++i) { 906 int val = ports_[i]->SetOption(opt, value); 907 if (val < 0) { 908 // Because this also occurs deferred, probably no point in reporting an 909 // error 910 LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " 911 << ports_[i]->GetError(); 912 } 913 } 914 return 0; 915 } 916 917 // When the signalling channel is ready, we can really kick off the allocator 918 void P2PTransportChannel::OnSignalingReady() { 919 if (waiting_for_signaling_) { 920 waiting_for_signaling_ = false; 921 AddAllocatorSession(allocator_->CreateSession(name(), content_type())); 922 thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE); 923 } 924 } 925 926 } // namespace cricket 927