1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/p2ptransportchannel.h" 29 30 #include <set> 31 #include "talk/base/common.h" 32 #include "talk/base/crc32.h" 33 #include "talk/base/logging.h" 34 #include "talk/base/stringencode.h" 35 #include "talk/p2p/base/common.h" 36 #include "talk/p2p/base/relayport.h" // For RELAY_PORT_TYPE. 37 #include "talk/p2p/base/stunport.h" // For STUN_PORT_TYPE. 38 39 namespace { 40 41 // messages for queuing up work for ourselves 42 enum { 43 MSG_SORT = 1, 44 MSG_PING, 45 }; 46 47 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers) 48 // for pinging. When the socket is writable, we will use only 1 Kbps because 49 // we don't want to degrade the quality on a modem. These numbers should work 50 // well on a 28.8K modem, which is the slowest connection on which the voice 51 // quality is reasonable at all. 52 static const uint32 PING_PACKET_SIZE = 60 * 8; 53 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms 54 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms 55 56 // If there is a current writable connection, then we will also try hard to 57 // make sure it is pinged at this rate. 58 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit 59 60 // The minimum improvement in RTT that justifies a switch. 61 static const double kMinImprovement = 10; 62 63 cricket::PortInterface::CandidateOrigin GetOrigin(cricket::PortInterface* port, 64 cricket::PortInterface* origin_port) { 65 if (!origin_port) 66 return cricket::PortInterface::ORIGIN_MESSAGE; 67 else if (port == origin_port) 68 return cricket::PortInterface::ORIGIN_THIS_PORT; 69 else 70 return cricket::PortInterface::ORIGIN_OTHER_PORT; 71 } 72 73 // Compares two connections based only on static information about them. 74 int CompareConnectionCandidates(cricket::Connection* a, 75 cricket::Connection* b) { 76 // Compare connection priority. Lower values get sorted last. 77 if (a->priority() > b->priority()) 78 return 1; 79 if (a->priority() < b->priority()) 80 return -1; 81 82 // If we're still tied at this point, prefer a younger generation. 83 return (a->remote_candidate().generation() + a->port()->generation()) - 84 (b->remote_candidate().generation() + b->port()->generation()); 85 } 86 87 // Compare two connections based on their writability and static preferences. 88 int CompareConnections(cricket::Connection *a, cricket::Connection *b) { 89 // Sort based on write-state. Better states have lower values. 90 if (a->write_state() < b->write_state()) 91 return 1; 92 if (a->write_state() > b->write_state()) 93 return -1; 94 95 // Compare the candidate information. 96 return CompareConnectionCandidates(a, b); 97 } 98 99 // Wraps the comparison connection into a less than operator that puts higher 100 // priority writable connections first. 101 class ConnectionCompare { 102 public: 103 bool operator()(const cricket::Connection *ca, 104 const cricket::Connection *cb) { 105 cricket::Connection* a = const_cast<cricket::Connection*>(ca); 106 cricket::Connection* b = const_cast<cricket::Connection*>(cb); 107 108 ASSERT(a->port()->IceProtocol() == b->port()->IceProtocol()); 109 110 // Compare first on writability and static preferences. 111 int cmp = CompareConnections(a, b); 112 if (cmp > 0) 113 return true; 114 if (cmp < 0) 115 return false; 116 117 // Otherwise, sort based on latency estimate. 118 return a->rtt() < b->rtt(); 119 120 // Should we bother checking for the last connection that last received 121 // data? It would help rendezvous on the connection that is also receiving 122 // packets. 123 // 124 // TODO: Yes we should definitely do this. The TCP protocol gains 125 // efficiency by being used bidirectionally, as opposed to two separate 126 // unidirectional streams. This test should probably occur before 127 // comparison of local prefs (assuming combined prefs are the same). We 128 // need to be careful though, not to bounce back and forth with both sides 129 // trying to rendevous with the other. 130 } 131 }; 132 133 // Determines whether we should switch between two connections, based first on 134 // static preferences and then (if those are equal) on latency estimates. 135 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { 136 if (a_conn == b_conn) 137 return false; 138 139 if (!a_conn || !b_conn) // don't think the latter should happen 140 return true; 141 142 int prefs_cmp = CompareConnections(a_conn, b_conn); 143 if (prefs_cmp < 0) 144 return true; 145 if (prefs_cmp > 0) 146 return false; 147 148 return b_conn->rtt() <= a_conn->rtt() + kMinImprovement; 149 } 150 151 } // unnamed namespace 152 153 namespace cricket { 154 155 P2PTransportChannel::P2PTransportChannel(const std::string& content_name, 156 int component, 157 P2PTransport* transport, 158 PortAllocator *allocator) : 159 TransportChannelImpl(content_name, component), 160 transport_(transport), 161 allocator_(allocator), 162 worker_thread_(talk_base::Thread::Current()), 163 incoming_only_(false), 164 waiting_for_signaling_(false), 165 error_(0), 166 best_connection_(NULL), 167 pending_best_connection_(NULL), 168 sort_dirty_(false), 169 was_writable_(false), 170 protocol_type_(ICEPROTO_HYBRID), 171 remote_ice_mode_(ICEMODE_FULL), 172 ice_role_(ICEROLE_UNKNOWN), 173 tiebreaker_(0), 174 remote_candidate_generation_(0) { 175 } 176 177 P2PTransportChannel::~P2PTransportChannel() { 178 ASSERT(worker_thread_ == talk_base::Thread::Current()); 179 180 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 181 delete allocator_sessions_[i]; 182 } 183 184 // Add the allocator session to our list so that we know which sessions 185 // are still active. 186 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) { 187 session->set_generation(static_cast<uint32>(allocator_sessions_.size())); 188 allocator_sessions_.push_back(session); 189 190 // We now only want to apply new candidates that we receive to the ports 191 // created by this new session because these are replacing those of the 192 // previous sessions. 193 ports_.clear(); 194 195 session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); 196 session->SignalCandidatesReady.connect( 197 this, &P2PTransportChannel::OnCandidatesReady); 198 session->SignalCandidatesAllocationDone.connect( 199 this, &P2PTransportChannel::OnCandidatesAllocationDone); 200 session->StartGettingPorts(); 201 } 202 203 void P2PTransportChannel::AddConnection(Connection* connection) { 204 connections_.push_back(connection); 205 connection->set_remote_ice_mode(remote_ice_mode_); 206 connection->SignalReadPacket.connect( 207 this, &P2PTransportChannel::OnReadPacket); 208 connection->SignalReadyToSend.connect( 209 this, &P2PTransportChannel::OnReadyToSend); 210 connection->SignalStateChange.connect( 211 this, &P2PTransportChannel::OnConnectionStateChange); 212 connection->SignalDestroyed.connect( 213 this, &P2PTransportChannel::OnConnectionDestroyed); 214 connection->SignalUseCandidate.connect( 215 this, &P2PTransportChannel::OnUseCandidate); 216 } 217 218 void P2PTransportChannel::SetIceRole(IceRole ice_role) { 219 ASSERT(worker_thread_ == talk_base::Thread::Current()); 220 if (ice_role_ != ice_role) { 221 ice_role_ = ice_role; 222 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 223 it != ports_.end(); ++it) { 224 (*it)->SetIceRole(ice_role); 225 } 226 } 227 } 228 229 void P2PTransportChannel::SetIceTiebreaker(uint64 tiebreaker) { 230 ASSERT(worker_thread_ == talk_base::Thread::Current()); 231 if (!ports_.empty()) { 232 LOG(LS_ERROR) 233 << "Attempt to change tiebreaker after Port has been allocated."; 234 return; 235 } 236 237 tiebreaker_ = tiebreaker; 238 } 239 240 bool P2PTransportChannel::GetIceProtocolType(IceProtocolType* type) const { 241 *type = protocol_type_; 242 return true; 243 } 244 245 void P2PTransportChannel::SetIceProtocolType(IceProtocolType type) { 246 ASSERT(worker_thread_ == talk_base::Thread::Current()); 247 248 protocol_type_ = type; 249 for (std::vector<PortInterface *>::iterator it = ports_.begin(); 250 it != ports_.end(); ++it) { 251 (*it)->SetIceProtocolType(protocol_type_); 252 } 253 } 254 255 void P2PTransportChannel::SetIceCredentials(const std::string& ice_ufrag, 256 const std::string& ice_pwd) { 257 ASSERT(worker_thread_ == talk_base::Thread::Current()); 258 bool ice_restart = false; 259 if (!ice_ufrag_.empty() && !ice_pwd_.empty()) { 260 // Restart candidate allocation if there is any change in either 261 // ice ufrag or password. 262 ice_restart = (ice_ufrag_ != ice_ufrag) || (ice_pwd_!= ice_pwd); 263 } 264 265 ice_ufrag_ = ice_ufrag; 266 ice_pwd_ = ice_pwd; 267 268 if (ice_restart) { 269 // Restart candidate gathering. 270 Allocate(); 271 } 272 } 273 274 void P2PTransportChannel::SetRemoteIceCredentials(const std::string& ice_ufrag, 275 const std::string& ice_pwd) { 276 ASSERT(worker_thread_ == talk_base::Thread::Current()); 277 bool ice_restart = false; 278 if (!remote_ice_ufrag_.empty() && !remote_ice_pwd_.empty()) { 279 ice_restart = (remote_ice_ufrag_ != ice_ufrag) || 280 (remote_ice_pwd_!= ice_pwd); 281 } 282 283 remote_ice_ufrag_ = ice_ufrag; 284 remote_ice_pwd_ = ice_pwd; 285 286 if (ice_restart) { 287 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 288 // Therefore we need to keep track of the remote ice restart so 289 // newer connections are prioritized over the older. 290 ++remote_candidate_generation_; 291 } 292 } 293 294 void P2PTransportChannel::SetRemoteIceMode(IceMode mode) { 295 remote_ice_mode_ = mode; 296 } 297 298 // Go into the state of processing candidates, and running in general 299 void P2PTransportChannel::Connect() { 300 ASSERT(worker_thread_ == talk_base::Thread::Current()); 301 if (ice_ufrag_.empty() || ice_pwd_.empty()) { 302 ASSERT(false); 303 LOG(LS_ERROR) << "P2PTransportChannel::Connect: The ice_ufrag_ and the " 304 << "ice_pwd_ are not set."; 305 return; 306 } 307 308 // Kick off an allocator session 309 Allocate(); 310 311 // Start pinging as the ports come in. 312 thread()->Post(this, MSG_PING); 313 } 314 315 // Reset the socket, clear up any previous allocations and start over 316 void P2PTransportChannel::Reset() { 317 ASSERT(worker_thread_ == talk_base::Thread::Current()); 318 319 // Get rid of all the old allocators. This should clean up everything. 320 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) 321 delete allocator_sessions_[i]; 322 323 allocator_sessions_.clear(); 324 ports_.clear(); 325 connections_.clear(); 326 best_connection_ = NULL; 327 328 // Forget about all of the candidates we got before. 329 remote_candidates_.clear(); 330 331 // Revert to the initial state. 332 set_readable(false); 333 set_writable(false); 334 335 // Reinitialize the rest of our state. 336 waiting_for_signaling_ = false; 337 sort_dirty_ = false; 338 339 // If we allocated before, start a new one now. 340 if (transport_->connect_requested()) 341 Allocate(); 342 343 // Start pinging as the ports come in. 344 thread()->Clear(this); 345 thread()->Post(this, MSG_PING); 346 } 347 348 // A new port is available, attempt to make connections for it 349 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session, 350 PortInterface* port) { 351 ASSERT(worker_thread_ == talk_base::Thread::Current()); 352 353 // Set in-effect options on the new port 354 for (OptionMap::const_iterator it = options_.begin(); 355 it != options_.end(); 356 ++it) { 357 int val = port->SetOption(it->first, it->second); 358 if (val < 0) { 359 LOG_J(LS_WARNING, port) << "SetOption(" << it->first 360 << ", " << it->second 361 << ") failed: " << port->GetError(); 362 } 363 } 364 365 // Remember the ports and candidates, and signal that candidates are ready. 366 // The session will handle this, and send an initiate/accept/modify message 367 // if one is pending. 368 369 port->SetIceProtocolType(protocol_type_); 370 port->SetIceRole(ice_role_); 371 port->SetIceTiebreaker(tiebreaker_); 372 ports_.push_back(port); 373 port->SignalUnknownAddress.connect( 374 this, &P2PTransportChannel::OnUnknownAddress); 375 port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed); 376 port->SignalRoleConflict.connect( 377 this, &P2PTransportChannel::OnRoleConflict); 378 379 // Attempt to create a connection from this new port to all of the remote 380 // candidates that we were given so far. 381 382 std::vector<RemoteCandidate>::iterator iter; 383 for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); 384 ++iter) { 385 CreateConnection(port, *iter, iter->origin_port(), false); 386 } 387 388 SortConnections(); 389 } 390 391 // A new candidate is available, let listeners know 392 void P2PTransportChannel::OnCandidatesReady( 393 PortAllocatorSession *session, const std::vector<Candidate>& candidates) { 394 ASSERT(worker_thread_ == talk_base::Thread::Current()); 395 for (size_t i = 0; i < candidates.size(); ++i) { 396 SignalCandidateReady(this, candidates[i]); 397 } 398 } 399 400 void P2PTransportChannel::OnCandidatesAllocationDone( 401 PortAllocatorSession* session) { 402 ASSERT(worker_thread_ == talk_base::Thread::Current()); 403 SignalCandidatesAllocationDone(this); 404 } 405 406 // Handle stun packets 407 void P2PTransportChannel::OnUnknownAddress( 408 PortInterface* port, 409 const talk_base::SocketAddress& address, ProtocolType proto, 410 IceMessage* stun_msg, const std::string &remote_username, 411 bool port_muxed) { 412 ASSERT(worker_thread_ == talk_base::Thread::Current()); 413 414 // Port has received a valid stun packet from an address that no Connection 415 // is currently available for. See if we already have a candidate with the 416 // address. If it isn't we need to create new candidate for it. 417 418 // Determine if the remote candidates use shared ufrag. 419 bool ufrag_per_port = false; 420 std::vector<RemoteCandidate>::iterator it; 421 if (remote_candidates_.size() > 0) { 422 it = remote_candidates_.begin(); 423 std::string username = it->username(); 424 for (; it != remote_candidates_.end(); ++it) { 425 if (it->username() != username) { 426 ufrag_per_port = true; 427 break; 428 } 429 } 430 } 431 432 const Candidate* candidate = NULL; 433 bool known_username = false; 434 std::string remote_password; 435 for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { 436 if (it->username() == remote_username) { 437 remote_password = it->password(); 438 known_username = true; 439 if (ufrag_per_port || 440 (it->address() == address && 441 it->protocol() == ProtoToString(proto))) { 442 candidate = &(*it); 443 break; 444 } 445 // We don't want to break here because we may find a match of the address 446 // later. 447 } 448 } 449 450 if (!known_username) { 451 if (port_muxed) { 452 // When Ports are muxed, SignalUnknownAddress is delivered to all 453 // P2PTransportChannel belong to a session. Return from here will 454 // save us from sending stun binding error message from incorrect channel. 455 return; 456 } 457 // Don't know about this username, the request is bogus 458 // This sometimes happens if a binding response comes in before the ACCEPT 459 // message. It is totally valid; the retry state machine will try again. 460 port->SendBindingErrorResponse(stun_msg, address, 461 STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); 462 return; 463 } 464 465 Candidate new_remote_candidate; 466 if (candidate != NULL) { 467 new_remote_candidate = *candidate; 468 if (ufrag_per_port) { 469 new_remote_candidate.set_address(address); 470 } 471 } else { 472 // Create a new candidate with this address. 473 std::string type; 474 if (port->IceProtocol() == ICEPROTO_RFC5245) { 475 type = PRFLX_PORT_TYPE; 476 } else { 477 // G-ICE doesn't support prflx candidate. 478 // We set candidate type to STUN_PORT_TYPE if the binding request comes 479 // from a relay port or the shared socket is used. Otherwise we use the 480 // port's type as the candidate type. 481 if (port->Type() == RELAY_PORT_TYPE || port->SharedSocket()) { 482 type = STUN_PORT_TYPE; 483 } else { 484 type = port->Type(); 485 } 486 } 487 488 std::string id = talk_base::CreateRandomString(8); 489 new_remote_candidate = Candidate( 490 id, component(), ProtoToString(proto), address, 491 0, remote_username, remote_password, type, 492 port->Network()->name(), 0U, 493 talk_base::ToString<uint32>(talk_base::ComputeCrc32(id))); 494 new_remote_candidate.set_priority( 495 new_remote_candidate.GetPriority(ICE_TYPE_PREFERENCE_SRFLX, 496 port->Network()->preference())); 497 } 498 499 if (port->IceProtocol() == ICEPROTO_RFC5245) { 500 // RFC 5245 501 // If the source transport address of the request does not match any 502 // existing remote candidates, it represents a new peer reflexive remote 503 // candidate. 504 505 // The priority of the candidate is set to the PRIORITY attribute 506 // from the request. 507 const StunUInt32Attribute* priority_attr = 508 stun_msg->GetUInt32(STUN_ATTR_PRIORITY); 509 if (!priority_attr) { 510 LOG(LS_WARNING) << "P2PTransportChannel::OnUnknownAddress - " 511 << "No STUN_ATTR_PRIORITY found in the " 512 << "stun request message"; 513 port->SendBindingErrorResponse(stun_msg, address, 514 STUN_ERROR_BAD_REQUEST, 515 STUN_ERROR_REASON_BAD_REQUEST); 516 return; 517 } 518 new_remote_candidate.set_priority(priority_attr->value()); 519 520 // RFC5245, the agent constructs a pair whose local candidate is equal to 521 // the transport address on which the STUN request was received, and a 522 // remote candidate equal to the source transport address where the 523 // request came from. 524 525 // There shouldn't be an existing connection with this remote address. 526 // When ports are muxed, this channel might get multiple unknown address 527 // signals. In that case if the connection is already exists, we should 528 // simply ignore the signal othewise send server error. 529 if (port->GetConnection(new_remote_candidate.address())) { 530 if (port_muxed) { 531 LOG(LS_INFO) << "Connection already exists for peer reflexive " 532 << "candidate: " << new_remote_candidate.ToString(); 533 return; 534 } else { 535 ASSERT(false); 536 port->SendBindingErrorResponse(stun_msg, address, 537 STUN_ERROR_SERVER_ERROR, 538 STUN_ERROR_REASON_SERVER_ERROR); 539 return; 540 } 541 } 542 543 Connection* connection = port->CreateConnection( 544 new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT); 545 if (!connection) { 546 ASSERT(false); 547 port->SendBindingErrorResponse(stun_msg, address, 548 STUN_ERROR_SERVER_ERROR, 549 STUN_ERROR_REASON_SERVER_ERROR); 550 return; 551 } 552 553 AddConnection(connection); 554 connection->ReceivedPing(); 555 556 // Send the pinger a successful stun response. 557 port->SendBindingResponse(stun_msg, address); 558 559 // Update the list of connections since we just added another. We do this 560 // after sending the response since it could (in principle) delete the 561 // connection in question. 562 SortConnections(); 563 } else { 564 // Check for connectivity to this address. Create connections 565 // to this address across all local ports. First, add this as a new remote 566 // address 567 if (!CreateConnections(new_remote_candidate, port, true)) { 568 // Hopefully this won't occur, because changing a destination address 569 // shouldn't cause a new connection to fail 570 ASSERT(false); 571 port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, 572 STUN_ERROR_REASON_SERVER_ERROR); 573 return; 574 } 575 576 // Send the pinger a successful stun response. 577 port->SendBindingResponse(stun_msg, address); 578 579 // Update the list of connections since we just added another. We do this 580 // after sending the response since it could (in principle) delete the 581 // connection in question. 582 SortConnections(); 583 } 584 } 585 586 void P2PTransportChannel::OnRoleConflict(PortInterface* port) { 587 SignalRoleConflict(this); // STUN ping will be sent when SetRole is called 588 // from Transport. 589 } 590 591 // When the signalling channel is ready, we can really kick off the allocator 592 void P2PTransportChannel::OnSignalingReady() { 593 ASSERT(worker_thread_ == talk_base::Thread::Current()); 594 if (waiting_for_signaling_) { 595 waiting_for_signaling_ = false; 596 AddAllocatorSession(allocator_->CreateSession( 597 SessionId(), content_name(), component(), ice_ufrag_, ice_pwd_)); 598 } 599 } 600 601 void P2PTransportChannel::OnUseCandidate(Connection* conn) { 602 ASSERT(worker_thread_ == talk_base::Thread::Current()); 603 ASSERT(ice_role_ == ICEROLE_CONTROLLED); 604 ASSERT(protocol_type_ == ICEPROTO_RFC5245); 605 if (conn->write_state() == Connection::STATE_WRITABLE) { 606 if (best_connection_ != conn) { 607 pending_best_connection_ = NULL; 608 SwitchBestConnectionTo(conn); 609 // Now we have selected the best connection, time to prune other existing 610 // connections and update the read/write state of the channel. 611 RequestSort(); 612 } 613 } else { 614 pending_best_connection_ = conn; 615 } 616 } 617 618 void P2PTransportChannel::OnCandidate(const Candidate& candidate) { 619 ASSERT(worker_thread_ == talk_base::Thread::Current()); 620 621 // Create connections to this remote candidate. 622 CreateConnections(candidate, NULL, false); 623 624 // Resort the connections list, which may have new elements. 625 SortConnections(); 626 } 627 628 // Creates connections from all of the ports that we care about to the given 629 // remote candidate. The return value is true if we created a connection from 630 // the origin port. 631 bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, 632 PortInterface* origin_port, 633 bool readable) { 634 ASSERT(worker_thread_ == talk_base::Thread::Current()); 635 636 Candidate new_remote_candidate(remote_candidate); 637 new_remote_candidate.set_generation( 638 GetRemoteCandidateGeneration(remote_candidate)); 639 // ICE candidates don't need to have username and password set, but 640 // the code below this (specifically, ConnectionRequest::Prepare in 641 // port.cc) uses the remote candidates's username. So, we set it 642 // here. 643 if (remote_candidate.username().empty()) { 644 new_remote_candidate.set_username(remote_ice_ufrag_); 645 } 646 if (remote_candidate.password().empty()) { 647 new_remote_candidate.set_password(remote_ice_pwd_); 648 } 649 650 // If we've already seen the new remote candidate (in the current candidate 651 // generation), then we shouldn't try creating connections for it. 652 // We either already have a connection for it, or we previously created one 653 // and then later pruned it. If we don't return, the channel will again 654 // re-create any connections that were previously pruned, which will then 655 // immediately be re-pruned, churning the network for no purpose. 656 // This only applies to candidates received over signaling (i.e. origin_port 657 // is NULL). 658 if (!origin_port && IsDuplicateRemoteCandidate(new_remote_candidate)) { 659 // return true to indicate success, without creating any new connections. 660 return true; 661 } 662 663 // Add a new connection for this candidate to every port that allows such a 664 // connection (i.e., if they have compatible protocols) and that does not 665 // already have a connection to an equivalent candidate. We must be careful 666 // to make sure that the origin port is included, even if it was pruned, 667 // since that may be the only port that can create this connection. 668 bool created = false; 669 std::vector<PortInterface *>::reverse_iterator it; 670 for (it = ports_.rbegin(); it != ports_.rend(); ++it) { 671 if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) { 672 if (*it == origin_port) 673 created = true; 674 } 675 } 676 677 if ((origin_port != NULL) && 678 std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { 679 if (CreateConnection( 680 origin_port, new_remote_candidate, origin_port, readable)) 681 created = true; 682 } 683 684 // Remember this remote candidate so that we can add it to future ports. 685 RememberRemoteCandidate(new_remote_candidate, origin_port); 686 687 return created; 688 } 689 690 // Setup a connection object for the local and remote candidate combination. 691 // And then listen to connection object for changes. 692 bool P2PTransportChannel::CreateConnection(PortInterface* port, 693 const Candidate& remote_candidate, 694 PortInterface* origin_port, 695 bool readable) { 696 // Look for an existing connection with this remote address. If one is not 697 // found, then we can create a new connection for this address. 698 Connection* connection = port->GetConnection(remote_candidate.address()); 699 if (connection != NULL) { 700 // It is not legal to try to change any of the parameters of an existing 701 // connection; however, the other side can send a duplicate candidate. 702 if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { 703 LOG(INFO) << "Attempt to change a remote candidate." 704 << " Existing remote candidate: " 705 << connection->remote_candidate().ToString() 706 << "New remote candidate: " 707 << remote_candidate.ToString(); 708 return false; 709 } 710 } else { 711 PortInterface::CandidateOrigin origin = GetOrigin(port, origin_port); 712 713 // Don't create connection if this is a candidate we received in a 714 // message and we are not allowed to make outgoing connections. 715 if (origin == cricket::PortInterface::ORIGIN_MESSAGE && incoming_only_) 716 return false; 717 718 connection = port->CreateConnection(remote_candidate, origin); 719 if (!connection) 720 return false; 721 722 AddConnection(connection); 723 724 LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", (" 725 << connections_.size() << " total)"; 726 } 727 728 // If we are readable, it is because we are creating this in response to a 729 // ping from the other side. This will cause the state to become readable. 730 if (readable) 731 connection->ReceivedPing(); 732 733 return true; 734 } 735 736 bool P2PTransportChannel::FindConnection( 737 cricket::Connection* connection) const { 738 std::vector<Connection*>::const_iterator citer = 739 std::find(connections_.begin(), connections_.end(), connection); 740 return citer != connections_.end(); 741 } 742 743 uint32 P2PTransportChannel::GetRemoteCandidateGeneration( 744 const Candidate& candidate) { 745 if (protocol_type_ == ICEPROTO_GOOGLE) { 746 // The Candidate.generation() can be trusted. Nothing needs to be done. 747 return candidate.generation(); 748 } 749 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245. 750 // Therefore we need to keep track of the remote ice restart so 751 // newer connections are prioritized over the older. 752 ASSERT(candidate.generation() == 0 || 753 candidate.generation() == remote_candidate_generation_); 754 return remote_candidate_generation_; 755 } 756 757 // Check if remote candidate is already cached. 758 bool P2PTransportChannel::IsDuplicateRemoteCandidate( 759 const Candidate& candidate) { 760 for (uint32 i = 0; i < remote_candidates_.size(); ++i) { 761 if (remote_candidates_[i].IsEquivalent(candidate)) { 762 return true; 763 } 764 } 765 return false; 766 } 767 768 // Maintain our remote candidate list, adding this new remote one. 769 void P2PTransportChannel::RememberRemoteCandidate( 770 const Candidate& remote_candidate, PortInterface* origin_port) { 771 // Remove any candidates whose generation is older than this one. The 772 // presence of a new generation indicates that the old ones are not useful. 773 uint32 i = 0; 774 while (i < remote_candidates_.size()) { 775 if (remote_candidates_[i].generation() < remote_candidate.generation()) { 776 LOG(INFO) << "Pruning candidate from old generation: " 777 << remote_candidates_[i].address().ToSensitiveString(); 778 remote_candidates_.erase(remote_candidates_.begin() + i); 779 } else { 780 i += 1; 781 } 782 } 783 784 // Make sure this candidate is not a duplicate. 785 if (IsDuplicateRemoteCandidate(remote_candidate)) { 786 LOG(INFO) << "Duplicate candidate: " << remote_candidate.ToString(); 787 return; 788 } 789 790 // Try this candidate for all future ports. 791 remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); 792 } 793 794 // Set options on ourselves is simply setting options on all of our available 795 // port objects. 796 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) { 797 OptionMap::iterator it = options_.find(opt); 798 if (it == options_.end()) { 799 options_.insert(std::make_pair(opt, value)); 800 } else if (it->second == value) { 801 return 0; 802 } else { 803 it->second = value; 804 } 805 806 for (uint32 i = 0; i < ports_.size(); ++i) { 807 int val = ports_[i]->SetOption(opt, value); 808 if (val < 0) { 809 // Because this also occurs deferred, probably no point in reporting an 810 // error 811 LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " 812 << ports_[i]->GetError(); 813 } 814 } 815 return 0; 816 } 817 818 // Send data to the other side, using our best connection. 819 int P2PTransportChannel::SendPacket(const char *data, size_t len, 820 const talk_base::PacketOptions& options, 821 int flags) { 822 ASSERT(worker_thread_ == talk_base::Thread::Current()); 823 if (flags != 0) { 824 error_ = EINVAL; 825 return -1; 826 } 827 if (best_connection_ == NULL) { 828 error_ = EWOULDBLOCK; 829 return -1; 830 } 831 832 int sent = best_connection_->Send(data, len, options); 833 if (sent <= 0) { 834 ASSERT(sent < 0); 835 error_ = best_connection_->GetError(); 836 } 837 return sent; 838 } 839 840 bool P2PTransportChannel::GetStats(ConnectionInfos *infos) { 841 ASSERT(worker_thread_ == talk_base::Thread::Current()); 842 // Gather connection infos. 843 infos->clear(); 844 845 std::vector<Connection *>::const_iterator it; 846 for (it = connections_.begin(); it != connections_.end(); ++it) { 847 Connection *connection = *it; 848 ConnectionInfo info; 849 info.best_connection = (best_connection_ == connection); 850 info.readable = 851 (connection->read_state() == Connection::STATE_READABLE); 852 info.writable = 853 (connection->write_state() == Connection::STATE_WRITABLE); 854 info.timeout = 855 (connection->write_state() == Connection::STATE_WRITE_TIMEOUT); 856 info.new_connection = !connection->reported(); 857 connection->set_reported(true); 858 info.rtt = connection->rtt(); 859 info.sent_total_bytes = connection->sent_total_bytes(); 860 info.sent_bytes_second = connection->sent_bytes_second(); 861 info.recv_total_bytes = connection->recv_total_bytes(); 862 info.recv_bytes_second = connection->recv_bytes_second(); 863 info.local_candidate = connection->local_candidate(); 864 info.remote_candidate = connection->remote_candidate(); 865 info.key = connection; 866 infos->push_back(info); 867 } 868 869 return true; 870 } 871 872 talk_base::DiffServCodePoint P2PTransportChannel::DefaultDscpValue() const { 873 OptionMap::const_iterator it = options_.find(talk_base::Socket::OPT_DSCP); 874 if (it == options_.end()) { 875 return talk_base::DSCP_NO_CHANGE; 876 } 877 return static_cast<talk_base::DiffServCodePoint> (it->second); 878 } 879 880 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending) 881 void P2PTransportChannel::Allocate() { 882 // Time for a new allocator, lets make sure we have a signalling channel 883 // to communicate candidates through first. 884 waiting_for_signaling_ = true; 885 SignalRequestSignaling(this); 886 } 887 888 // Monitor connection states. 889 void P2PTransportChannel::UpdateConnectionStates() { 890 uint32 now = talk_base::Time(); 891 892 // We need to copy the list of connections since some may delete themselves 893 // when we call UpdateState. 894 for (uint32 i = 0; i < connections_.size(); ++i) 895 connections_[i]->UpdateState(now); 896 } 897 898 // Prepare for best candidate sorting. 899 void P2PTransportChannel::RequestSort() { 900 if (!sort_dirty_) { 901 worker_thread_->Post(this, MSG_SORT); 902 sort_dirty_ = true; 903 } 904 } 905 906 // Sort the available connections to find the best one. We also monitor 907 // the number of available connections and the current state. 908 void P2PTransportChannel::SortConnections() { 909 ASSERT(worker_thread_ == talk_base::Thread::Current()); 910 911 // Make sure the connection states are up-to-date since this affects how they 912 // will be sorted. 913 UpdateConnectionStates(); 914 915 if (protocol_type_ == ICEPROTO_HYBRID) { 916 // If we are in hybrid mode, we are not sending any ping requests, so there 917 // is no point in sorting the connections. In hybrid state, ports can have 918 // different protocol than hybrid and protocol may differ from one another. 919 // Instead just update the state of this channel 920 UpdateChannelState(); 921 return; 922 } 923 924 // Any changes after this point will require a re-sort. 925 sort_dirty_ = false; 926 927 // Get a list of the networks that we are using. 928 std::set<talk_base::Network*> networks; 929 for (uint32 i = 0; i < connections_.size(); ++i) 930 networks.insert(connections_[i]->port()->Network()); 931 932 // Find the best alternative connection by sorting. It is important to note 933 // that amongst equal preference, writable connections, this will choose the 934 // one whose estimated latency is lowest. So it is the only one that we 935 // need to consider switching to. 936 937 ConnectionCompare cmp; 938 std::stable_sort(connections_.begin(), connections_.end(), cmp); 939 LOG(LS_VERBOSE) << "Sorting available connections:"; 940 for (uint32 i = 0; i < connections_.size(); ++i) { 941 LOG(LS_VERBOSE) << connections_[i]->ToString(); 942 } 943 944 Connection* top_connection = NULL; 945 if (connections_.size() > 0) 946 top_connection = connections_[0]; 947 948 // We don't want to pick the best connections if channel is using RFC5245 949 // and it's mode is CONTROLLED, as connections will be selected by the 950 // CONTROLLING agent. 951 952 // If necessary, switch to the new choice. 953 if (protocol_type_ != ICEPROTO_RFC5245 || ice_role_ == ICEROLE_CONTROLLING) { 954 if (ShouldSwitch(best_connection_, top_connection)) 955 SwitchBestConnectionTo(top_connection); 956 } 957 958 // We can prune any connection for which there is a writable connection on 959 // the same network with better or equal priority. We leave those with 960 // better priority just in case they become writable later (at which point, 961 // we would prune out the current best connection). We leave connections on 962 // other networks because they may not be using the same resources and they 963 // may represent very distinct paths over which we can switch. 964 std::set<talk_base::Network*>::iterator network; 965 for (network = networks.begin(); network != networks.end(); ++network) { 966 Connection* primier = GetBestConnectionOnNetwork(*network); 967 if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) 968 continue; 969 970 for (uint32 i = 0; i < connections_.size(); ++i) { 971 if ((connections_[i] != primier) && 972 (connections_[i]->port()->Network() == *network) && 973 (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { 974 connections_[i]->Prune(); 975 } 976 } 977 } 978 979 // Check if all connections are timedout. 980 bool all_connections_timedout = true; 981 for (uint32 i = 0; i < connections_.size(); ++i) { 982 if (connections_[i]->write_state() != Connection::STATE_WRITE_TIMEOUT) { 983 all_connections_timedout = false; 984 break; 985 } 986 } 987 988 // Now update the writable state of the channel with the information we have 989 // so far. 990 if (best_connection_ && best_connection_->writable()) { 991 HandleWritable(); 992 } else if (all_connections_timedout) { 993 HandleAllTimedOut(); 994 } else { 995 HandleNotWritable(); 996 } 997 998 // Update the state of this channel. This method is called whenever the 999 // state of any connection changes, so this is a good place to do this. 1000 UpdateChannelState(); 1001 } 1002 1003 1004 // Track the best connection, and let listeners know 1005 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) { 1006 // Note: if conn is NULL, the previous best_connection_ has been destroyed, 1007 // so don't use it. 1008 Connection* old_best_connection = best_connection_; 1009 best_connection_ = conn; 1010 if (best_connection_) { 1011 if (old_best_connection) { 1012 LOG_J(LS_INFO, this) << "Previous best connection: " 1013 << old_best_connection->ToString(); 1014 } 1015 LOG_J(LS_INFO, this) << "New best connection: " 1016 << best_connection_->ToString(); 1017 SignalRouteChange(this, best_connection_->remote_candidate()); 1018 } else { 1019 LOG_J(LS_INFO, this) << "No best connection"; 1020 } 1021 } 1022 1023 void P2PTransportChannel::UpdateChannelState() { 1024 // The Handle* functions already set the writable state. We'll just double- 1025 // check it here. 1026 bool writable = ((best_connection_ != NULL) && 1027 (best_connection_->write_state() == 1028 Connection::STATE_WRITABLE)); 1029 ASSERT(writable == this->writable()); 1030 if (writable != this->writable()) 1031 LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch"; 1032 1033 bool readable = false; 1034 for (uint32 i = 0; i < connections_.size(); ++i) { 1035 if (connections_[i]->read_state() == Connection::STATE_READABLE) { 1036 readable = true; 1037 break; 1038 } 1039 } 1040 set_readable(readable); 1041 } 1042 1043 // We checked the status of our connections and we had at least one that 1044 // was writable, go into the writable state. 1045 void P2PTransportChannel::HandleWritable() { 1046 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1047 if (!writable()) { 1048 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { 1049 if (allocator_sessions_[i]->IsGettingPorts()) { 1050 allocator_sessions_[i]->StopGettingPorts(); 1051 } 1052 } 1053 } 1054 1055 was_writable_ = true; 1056 set_writable(true); 1057 } 1058 1059 // Notify upper layer about channel not writable state, if it was before. 1060 void P2PTransportChannel::HandleNotWritable() { 1061 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1062 if (was_writable_) { 1063 was_writable_ = false; 1064 set_writable(false); 1065 } 1066 } 1067 1068 void P2PTransportChannel::HandleAllTimedOut() { 1069 // Currently we are treating this as channel not writable. 1070 HandleNotWritable(); 1071 } 1072 1073 // If we have a best connection, return it, otherwise return top one in the 1074 // list (later we will mark it best). 1075 Connection* P2PTransportChannel::GetBestConnectionOnNetwork( 1076 talk_base::Network* network) { 1077 // If the best connection is on this network, then it wins. 1078 if (best_connection_ && (best_connection_->port()->Network() == network)) 1079 return best_connection_; 1080 1081 // Otherwise, we return the top-most in sorted order. 1082 for (uint32 i = 0; i < connections_.size(); ++i) { 1083 if (connections_[i]->port()->Network() == network) 1084 return connections_[i]; 1085 } 1086 1087 return NULL; 1088 } 1089 1090 // Handle any queued up requests 1091 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) { 1092 switch (pmsg->message_id) { 1093 case MSG_SORT: 1094 OnSort(); 1095 break; 1096 case MSG_PING: 1097 OnPing(); 1098 break; 1099 default: 1100 ASSERT(false); 1101 break; 1102 } 1103 } 1104 1105 // Handle queued up sort request 1106 void P2PTransportChannel::OnSort() { 1107 // Resort the connections based on the new statistics. 1108 SortConnections(); 1109 } 1110 1111 // Handle queued up ping request 1112 void P2PTransportChannel::OnPing() { 1113 // Make sure the states of the connections are up-to-date (since this affects 1114 // which ones are pingable). 1115 UpdateConnectionStates(); 1116 1117 // Find the oldest pingable connection and have it do a ping. 1118 Connection* conn = FindNextPingableConnection(); 1119 if (conn) 1120 PingConnection(conn); 1121 1122 // Post ourselves a message to perform the next ping. 1123 uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY; 1124 thread()->PostDelayed(delay, this, MSG_PING); 1125 } 1126 1127 // Is the connection in a state for us to even consider pinging the other side? 1128 bool P2PTransportChannel::IsPingable(Connection* conn) { 1129 // An unconnected connection cannot be written to at all, so pinging is out 1130 // of the question. 1131 if (!conn->connected()) 1132 return false; 1133 1134 if (writable()) { 1135 // If we are writable, then we only want to ping connections that could be 1136 // better than this one, i.e., the ones that were not pruned. 1137 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); 1138 } else { 1139 // If we are not writable, then we need to try everything that might work. 1140 // This includes both connections that do not have write timeout as well as 1141 // ones that do not have read timeout. A connection could be readable but 1142 // be in write-timeout if we pruned it before. Since the other side is 1143 // still pinging it, it very well might still work. 1144 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || 1145 (conn->read_state() != Connection::STATE_READ_TIMEOUT); 1146 } 1147 } 1148 1149 // Returns the next pingable connection to ping. This will be the oldest 1150 // pingable connection unless we have a writable connection that is past the 1151 // maximum acceptable ping delay. 1152 Connection* P2PTransportChannel::FindNextPingableConnection() { 1153 uint32 now = talk_base::Time(); 1154 if (best_connection_ && 1155 (best_connection_->write_state() == Connection::STATE_WRITABLE) && 1156 (best_connection_->last_ping_sent() 1157 + MAX_CURRENT_WRITABLE_DELAY <= now)) { 1158 return best_connection_; 1159 } 1160 1161 Connection* oldest_conn = NULL; 1162 uint32 oldest_time = 0xFFFFFFFF; 1163 for (uint32 i = 0; i < connections_.size(); ++i) { 1164 if (IsPingable(connections_[i])) { 1165 if (connections_[i]->last_ping_sent() < oldest_time) { 1166 oldest_time = connections_[i]->last_ping_sent(); 1167 oldest_conn = connections_[i]; 1168 } 1169 } 1170 } 1171 return oldest_conn; 1172 } 1173 1174 // Apart from sending ping from |conn| this method also updates 1175 // |use_candidate_attr| flag. The criteria to update this flag is 1176 // explained below. 1177 // Set USE-CANDIDATE if doing ICE AND this channel is in CONTROLLING AND 1178 // a) Channel is in FULL ICE AND 1179 // a.1) |conn| is the best connection OR 1180 // a.2) there is no best connection OR 1181 // a.3) the best connection is unwritable OR 1182 // a.4) |conn| has higher priority than best_connection. 1183 // b) we're doing LITE ICE AND 1184 // b.1) |conn| is the best_connection AND 1185 // b.2) |conn| is writable. 1186 void P2PTransportChannel::PingConnection(Connection* conn) { 1187 bool use_candidate = false; 1188 if (protocol_type_ == ICEPROTO_RFC5245) { 1189 if (remote_ice_mode_ == ICEMODE_FULL && ice_role_ == ICEROLE_CONTROLLING) { 1190 use_candidate = (conn == best_connection_) || 1191 (best_connection_ == NULL) || 1192 (!best_connection_->writable()) || 1193 (conn->priority() > best_connection_->priority()); 1194 } else if (remote_ice_mode_ == ICEMODE_LITE && conn == best_connection_) { 1195 use_candidate = best_connection_->writable(); 1196 } 1197 } 1198 conn->set_use_candidate_attr(use_candidate); 1199 conn->Ping(talk_base::Time()); 1200 } 1201 1202 // When a connection's state changes, we need to figure out who to use as 1203 // the best connection again. It could have become usable, or become unusable. 1204 void P2PTransportChannel::OnConnectionStateChange(Connection* connection) { 1205 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1206 1207 // Update the best connection if the state change is from pending best 1208 // connection and role is controlled. 1209 if (protocol_type_ == ICEPROTO_RFC5245 && ice_role_ == ICEROLE_CONTROLLED) { 1210 if (connection == pending_best_connection_ && connection->writable()) { 1211 pending_best_connection_ = NULL; 1212 SwitchBestConnectionTo(connection); 1213 } 1214 } 1215 1216 // We have to unroll the stack before doing this because we may be changing 1217 // the state of connections while sorting. 1218 RequestSort(); 1219 } 1220 1221 // When a connection is removed, edit it out, and then update our best 1222 // connection. 1223 void P2PTransportChannel::OnConnectionDestroyed(Connection* connection) { 1224 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1225 1226 // Note: the previous best_connection_ may be destroyed by now, so don't 1227 // use it. 1228 1229 // Remove this connection from the list. 1230 std::vector<Connection*>::iterator iter = 1231 std::find(connections_.begin(), connections_.end(), connection); 1232 ASSERT(iter != connections_.end()); 1233 connections_.erase(iter); 1234 1235 LOG_J(LS_INFO, this) << "Removed connection (" 1236 << static_cast<int>(connections_.size()) << " remaining)"; 1237 1238 if (pending_best_connection_ == connection) { 1239 pending_best_connection_ = NULL; 1240 } 1241 1242 // If this is currently the best connection, then we need to pick a new one. 1243 // The call to SortConnections will pick a new one. It looks at the current 1244 // best connection in order to avoid switching between fairly similar ones. 1245 // Since this connection is no longer an option, we can just set best to NULL 1246 // and re-choose a best assuming that there was no best connection. 1247 if (best_connection_ == connection) { 1248 SwitchBestConnectionTo(NULL); 1249 RequestSort(); 1250 } 1251 1252 SignalConnectionRemoved(this); 1253 } 1254 1255 // When a port is destroyed remove it from our list of ports to use for 1256 // connection attempts. 1257 void P2PTransportChannel::OnPortDestroyed(PortInterface* port) { 1258 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1259 1260 // Remove this port from the list (if we didn't drop it already). 1261 std::vector<PortInterface*>::iterator iter = 1262 std::find(ports_.begin(), ports_.end(), port); 1263 if (iter != ports_.end()) 1264 ports_.erase(iter); 1265 1266 LOG(INFO) << "Removed port from p2p socket: " 1267 << static_cast<int>(ports_.size()) << " remaining"; 1268 } 1269 1270 // We data is available, let listeners know 1271 void P2PTransportChannel::OnReadPacket( 1272 Connection *connection, const char *data, size_t len, 1273 const talk_base::PacketTime& packet_time) { 1274 ASSERT(worker_thread_ == talk_base::Thread::Current()); 1275 1276 // Do not deliver, if packet doesn't belong to the correct transport channel. 1277 if (!FindConnection(connection)) 1278 return; 1279 1280 // Let the client know of an incoming packet 1281 SignalReadPacket(this, data, len, packet_time, 0); 1282 } 1283 1284 void P2PTransportChannel::OnReadyToSend(Connection* connection) { 1285 if (connection == best_connection_ && writable()) { 1286 SignalReadyToSend(this); 1287 } 1288 } 1289 1290 } // namespace cricket 1291