1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/client/basicportallocator.h" 29 30 #include <string> 31 #include <vector> 32 33 #include "talk/base/common.h" 34 #include "talk/base/helpers.h" 35 #include "talk/base/logging.h" 36 #include "talk/p2p/base/basicpacketsocketfactory.h" 37 #include "talk/p2p/base/common.h" 38 #include "talk/p2p/base/port.h" 39 #include "talk/p2p/base/relayport.h" 40 #include "talk/p2p/base/stunport.h" 41 #include "talk/p2p/base/tcpport.h" 42 #include "talk/p2p/base/turnport.h" 43 #include "talk/p2p/base/udpport.h" 44 45 using talk_base::CreateRandomId; 46 using talk_base::CreateRandomString; 47 48 namespace { 49 50 const uint32 MSG_CONFIG_START = 1; 51 const uint32 MSG_CONFIG_READY = 2; 52 const uint32 MSG_ALLOCATE = 3; 53 const uint32 MSG_ALLOCATION_PHASE = 4; 54 const uint32 MSG_SHAKE = 5; 55 const uint32 MSG_SEQUENCEOBJECTS_CREATED = 6; 56 const uint32 MSG_CONFIG_STOP = 7; 57 58 const uint32 ALLOCATE_DELAY = 250; 59 60 const int PHASE_UDP = 0; 61 const int PHASE_RELAY = 1; 62 const int PHASE_TCP = 2; 63 const int PHASE_SSLTCP = 3; 64 65 const int kNumPhases = 4; 66 67 const int SHAKE_MIN_DELAY = 45 * 1000; // 45 seconds 68 const int SHAKE_MAX_DELAY = 90 * 1000; // 90 seconds 69 70 int ShakeDelay() { 71 int range = SHAKE_MAX_DELAY - SHAKE_MIN_DELAY + 1; 72 return SHAKE_MIN_DELAY + CreateRandomId() % range; 73 } 74 75 } // namespace 76 77 namespace cricket { 78 79 const uint32 DISABLE_ALL_PHASES = 80 PORTALLOCATOR_DISABLE_UDP 81 | PORTALLOCATOR_DISABLE_TCP 82 | PORTALLOCATOR_DISABLE_STUN 83 | PORTALLOCATOR_DISABLE_RELAY; 84 85 // Performs the allocation of ports, in a sequenced (timed) manner, for a given 86 // network and IP address. 87 class AllocationSequence : public talk_base::MessageHandler, 88 public sigslot::has_slots<> { 89 public: 90 enum State { 91 kInit, // Initial state. 92 kRunning, // Started allocating ports. 93 kStopped, // Stopped from running. 94 kCompleted, // All ports are allocated. 95 96 // kInit --> kRunning --> {kCompleted|kStopped} 97 }; 98 99 AllocationSequence(BasicPortAllocatorSession* session, 100 talk_base::Network* network, 101 PortConfiguration* config, 102 uint32 flags); 103 ~AllocationSequence(); 104 bool Init(); 105 void Clear(); 106 107 State state() const { return state_; } 108 109 // Disables the phases for a new sequence that this one already covers for an 110 // equivalent network setup. 111 void DisableEquivalentPhases(talk_base::Network* network, 112 PortConfiguration* config, uint32* flags); 113 114 // Starts and stops the sequence. When started, it will continue allocating 115 // new ports on its own timed schedule. 116 void Start(); 117 void Stop(); 118 119 // MessageHandler 120 void OnMessage(talk_base::Message* msg); 121 122 void EnableProtocol(ProtocolType proto); 123 bool ProtocolEnabled(ProtocolType proto) const; 124 125 // Signal from AllocationSequence, when it's done with allocating ports. 126 // This signal is useful, when port allocation fails which doesn't result 127 // in any candidates. Using this signal BasicPortAllocatorSession can send 128 // its candidate discovery conclusion signal. Without this signal, 129 // BasicPortAllocatorSession doesn't have any event to trigger signal. This 130 // can also be achieved by starting timer in BPAS. 131 sigslot::signal1<AllocationSequence*> SignalPortAllocationComplete; 132 133 private: 134 typedef std::vector<ProtocolType> ProtocolList; 135 136 bool IsFlagSet(uint32 flag) { 137 return ((flags_ & flag) != 0); 138 } 139 void CreateUDPPorts(); 140 void CreateTCPPorts(); 141 void CreateStunPorts(); 142 void CreateRelayPorts(); 143 void CreateGturnPort(const RelayServerConfig& config); 144 void CreateTurnPort(const RelayServerConfig& config); 145 146 void OnReadPacket(talk_base::AsyncPacketSocket* socket, 147 const char* data, size_t size, 148 const talk_base::SocketAddress& remote_addr, 149 const talk_base::PacketTime& packet_time); 150 151 void OnPortDestroyed(PortInterface* port); 152 void OnResolvedTurnServerAddress( 153 TurnPort* port, const talk_base::SocketAddress& server_address, 154 const talk_base::SocketAddress& resolved_server_address); 155 156 BasicPortAllocatorSession* session_; 157 talk_base::Network* network_; 158 talk_base::IPAddress ip_; 159 PortConfiguration* config_; 160 State state_; 161 uint32 flags_; 162 ProtocolList protocols_; 163 talk_base::scoped_ptr<talk_base::AsyncPacketSocket> udp_socket_; 164 // There will be only one udp port per AllocationSequence. 165 UDPPort* udp_port_; 166 // Keeping a map for turn ports keyed with server addresses. 167 std::map<talk_base::SocketAddress, Port*> turn_ports_; 168 int phase_; 169 }; 170 171 // BasicPortAllocator 172 BasicPortAllocator::BasicPortAllocator( 173 talk_base::NetworkManager* network_manager, 174 talk_base::PacketSocketFactory* socket_factory) 175 : network_manager_(network_manager), 176 socket_factory_(socket_factory) { 177 ASSERT(socket_factory_ != NULL); 178 Construct(); 179 } 180 181 BasicPortAllocator::BasicPortAllocator( 182 talk_base::NetworkManager* network_manager) 183 : network_manager_(network_manager), 184 socket_factory_(NULL) { 185 Construct(); 186 } 187 188 BasicPortAllocator::BasicPortAllocator( 189 talk_base::NetworkManager* network_manager, 190 talk_base::PacketSocketFactory* socket_factory, 191 const talk_base::SocketAddress& stun_address) 192 : network_manager_(network_manager), 193 socket_factory_(socket_factory), 194 stun_address_(stun_address) { 195 ASSERT(socket_factory_ != NULL); 196 Construct(); 197 } 198 199 BasicPortAllocator::BasicPortAllocator( 200 talk_base::NetworkManager* network_manager, 201 const talk_base::SocketAddress& stun_address, 202 const talk_base::SocketAddress& relay_address_udp, 203 const talk_base::SocketAddress& relay_address_tcp, 204 const talk_base::SocketAddress& relay_address_ssl) 205 : network_manager_(network_manager), 206 socket_factory_(NULL), 207 stun_address_(stun_address) { 208 209 RelayServerConfig config(RELAY_GTURN); 210 if (!relay_address_udp.IsNil()) 211 config.ports.push_back(ProtocolAddress(relay_address_udp, PROTO_UDP)); 212 if (!relay_address_tcp.IsNil()) 213 config.ports.push_back(ProtocolAddress(relay_address_tcp, PROTO_TCP)); 214 if (!relay_address_ssl.IsNil()) 215 config.ports.push_back(ProtocolAddress(relay_address_ssl, PROTO_SSLTCP)); 216 217 if (!config.ports.empty()) 218 AddRelay(config); 219 220 Construct(); 221 } 222 223 void BasicPortAllocator::Construct() { 224 allow_tcp_listen_ = true; 225 } 226 227 BasicPortAllocator::~BasicPortAllocator() { 228 } 229 230 PortAllocatorSession *BasicPortAllocator::CreateSessionInternal( 231 const std::string& content_name, int component, 232 const std::string& ice_ufrag, const std::string& ice_pwd) { 233 return new BasicPortAllocatorSession(this, content_name, component, 234 ice_ufrag, ice_pwd); 235 } 236 237 // BasicPortAllocatorSession 238 BasicPortAllocatorSession::BasicPortAllocatorSession( 239 BasicPortAllocator *allocator, 240 const std::string& content_name, 241 int component, 242 const std::string& ice_ufrag, 243 const std::string& ice_pwd) 244 : PortAllocatorSession(content_name, component, 245 ice_ufrag, ice_pwd, allocator->flags()), 246 allocator_(allocator), network_thread_(NULL), 247 socket_factory_(allocator->socket_factory()), 248 allocation_started_(false), 249 network_manager_started_(false), 250 running_(false), 251 allocation_sequences_created_(false) { 252 allocator_->network_manager()->SignalNetworksChanged.connect( 253 this, &BasicPortAllocatorSession::OnNetworksChanged); 254 allocator_->network_manager()->StartUpdating(); 255 } 256 257 BasicPortAllocatorSession::~BasicPortAllocatorSession() { 258 allocator_->network_manager()->StopUpdating(); 259 if (network_thread_ != NULL) 260 network_thread_->Clear(this); 261 262 for (uint32 i = 0; i < sequences_.size(); ++i) { 263 // AllocationSequence should clear it's map entry for turn ports before 264 // ports are destroyed. 265 sequences_[i]->Clear(); 266 } 267 268 std::vector<PortData>::iterator it; 269 for (it = ports_.begin(); it != ports_.end(); it++) 270 delete it->port(); 271 272 for (uint32 i = 0; i < configs_.size(); ++i) 273 delete configs_[i]; 274 275 for (uint32 i = 0; i < sequences_.size(); ++i) 276 delete sequences_[i]; 277 } 278 279 void BasicPortAllocatorSession::StartGettingPorts() { 280 network_thread_ = talk_base::Thread::Current(); 281 if (!socket_factory_) { 282 owned_socket_factory_.reset( 283 new talk_base::BasicPacketSocketFactory(network_thread_)); 284 socket_factory_ = owned_socket_factory_.get(); 285 } 286 287 running_ = true; 288 network_thread_->Post(this, MSG_CONFIG_START); 289 290 if (flags() & PORTALLOCATOR_ENABLE_SHAKER) 291 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); 292 } 293 294 void BasicPortAllocatorSession::StopGettingPorts() { 295 ASSERT(talk_base::Thread::Current() == network_thread_); 296 running_ = false; 297 network_thread_->Clear(this, MSG_ALLOCATE); 298 for (uint32 i = 0; i < sequences_.size(); ++i) 299 sequences_[i]->Stop(); 300 network_thread_->Post(this, MSG_CONFIG_STOP); 301 } 302 303 void BasicPortAllocatorSession::OnMessage(talk_base::Message *message) { 304 switch (message->message_id) { 305 case MSG_CONFIG_START: 306 ASSERT(talk_base::Thread::Current() == network_thread_); 307 GetPortConfigurations(); 308 break; 309 310 case MSG_CONFIG_READY: 311 ASSERT(talk_base::Thread::Current() == network_thread_); 312 OnConfigReady(static_cast<PortConfiguration*>(message->pdata)); 313 break; 314 315 case MSG_ALLOCATE: 316 ASSERT(talk_base::Thread::Current() == network_thread_); 317 OnAllocate(); 318 break; 319 320 case MSG_SHAKE: 321 ASSERT(talk_base::Thread::Current() == network_thread_); 322 OnShake(); 323 break; 324 case MSG_SEQUENCEOBJECTS_CREATED: 325 ASSERT(talk_base::Thread::Current() == network_thread_); 326 OnAllocationSequenceObjectsCreated(); 327 break; 328 case MSG_CONFIG_STOP: 329 ASSERT(talk_base::Thread::Current() == network_thread_); 330 OnConfigStop(); 331 break; 332 default: 333 ASSERT(false); 334 } 335 } 336 337 void BasicPortAllocatorSession::GetPortConfigurations() { 338 PortConfiguration* config = new PortConfiguration(allocator_->stun_address(), 339 username(), 340 password()); 341 342 for (size_t i = 0; i < allocator_->relays().size(); ++i) { 343 config->AddRelay(allocator_->relays()[i]); 344 } 345 ConfigReady(config); 346 } 347 348 void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { 349 network_thread_->Post(this, MSG_CONFIG_READY, config); 350 } 351 352 // Adds a configuration to the list. 353 void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { 354 if (config) 355 configs_.push_back(config); 356 357 AllocatePorts(); 358 } 359 360 void BasicPortAllocatorSession::OnConfigStop() { 361 ASSERT(talk_base::Thread::Current() == network_thread_); 362 363 // If any of the allocated ports have not completed the candidates allocation, 364 // mark those as error. Since session doesn't need any new candidates 365 // at this stage of the allocation, it's safe to discard any new candidates. 366 bool send_signal = false; 367 for (std::vector<PortData>::iterator it = ports_.begin(); 368 it != ports_.end(); ++it) { 369 if (!it->complete()) { 370 // Updating port state to error, which didn't finish allocating candidates 371 // yet. 372 it->set_error(); 373 send_signal = true; 374 } 375 } 376 377 // Did we stop any running sequences? 378 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin(); 379 it != sequences_.end() && !send_signal; ++it) { 380 if ((*it)->state() == AllocationSequence::kStopped) { 381 send_signal = true; 382 } 383 } 384 385 // If we stopped anything that was running, send a done signal now. 386 if (send_signal) { 387 MaybeSignalCandidatesAllocationDone(); 388 } 389 } 390 391 void BasicPortAllocatorSession::AllocatePorts() { 392 ASSERT(talk_base::Thread::Current() == network_thread_); 393 network_thread_->Post(this, MSG_ALLOCATE); 394 } 395 396 void BasicPortAllocatorSession::OnAllocate() { 397 if (network_manager_started_) 398 DoAllocate(); 399 400 allocation_started_ = true; 401 if (running_) 402 network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE); 403 } 404 405 // For each network, see if we have a sequence that covers it already. If not, 406 // create a new sequence to create the appropriate ports. 407 void BasicPortAllocatorSession::DoAllocate() { 408 bool done_signal_needed = false; 409 std::vector<talk_base::Network*> networks; 410 allocator_->network_manager()->GetNetworks(&networks); 411 if (networks.empty()) { 412 LOG(LS_WARNING) << "Machine has no networks; no ports will be allocated"; 413 done_signal_needed = true; 414 } else { 415 for (uint32 i = 0; i < networks.size(); ++i) { 416 PortConfiguration* config = NULL; 417 if (configs_.size() > 0) 418 config = configs_.back(); 419 420 uint32 sequence_flags = flags(); 421 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { 422 // If all the ports are disabled we should just fire the allocation 423 // done event and return. 424 done_signal_needed = true; 425 break; 426 } 427 428 // Disables phases that are not specified in this config. 429 if (!config || config->stun_address.IsNil()) { 430 // No STUN ports specified in this config. 431 sequence_flags |= PORTALLOCATOR_DISABLE_STUN; 432 } 433 if (!config || config->relays.empty()) { 434 // No relay ports specified in this config. 435 sequence_flags |= PORTALLOCATOR_DISABLE_RELAY; 436 } 437 438 if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) && 439 networks[i]->ip().family() == AF_INET6) { 440 // Skip IPv6 networks unless the flag's been set. 441 continue; 442 } 443 444 // Disable phases that would only create ports equivalent to 445 // ones that we have already made. 446 DisableEquivalentPhases(networks[i], config, &sequence_flags); 447 448 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { 449 // New AllocationSequence would have nothing to do, so don't make it. 450 continue; 451 } 452 453 AllocationSequence* sequence = 454 new AllocationSequence(this, networks[i], config, sequence_flags); 455 if (!sequence->Init()) { 456 delete sequence; 457 continue; 458 } 459 done_signal_needed = true; 460 sequence->SignalPortAllocationComplete.connect( 461 this, &BasicPortAllocatorSession::OnPortAllocationComplete); 462 if (running_) 463 sequence->Start(); 464 sequences_.push_back(sequence); 465 } 466 } 467 if (done_signal_needed) { 468 network_thread_->Post(this, MSG_SEQUENCEOBJECTS_CREATED); 469 } 470 } 471 472 void BasicPortAllocatorSession::OnNetworksChanged() { 473 network_manager_started_ = true; 474 if (allocation_started_) 475 DoAllocate(); 476 } 477 478 void BasicPortAllocatorSession::DisableEquivalentPhases( 479 talk_base::Network* network, PortConfiguration* config, uint32* flags) { 480 for (uint32 i = 0; i < sequences_.size() && 481 (*flags & DISABLE_ALL_PHASES) != DISABLE_ALL_PHASES; ++i) { 482 sequences_[i]->DisableEquivalentPhases(network, config, flags); 483 } 484 } 485 486 void BasicPortAllocatorSession::AddAllocatedPort(Port* port, 487 AllocationSequence * seq, 488 bool prepare_address) { 489 if (!port) 490 return; 491 492 LOG(LS_INFO) << "Adding allocated port for " << content_name(); 493 port->set_content_name(content_name()); 494 port->set_component(component_); 495 port->set_generation(generation()); 496 if (allocator_->proxy().type != talk_base::PROXY_NONE) 497 port->set_proxy(allocator_->user_agent(), allocator_->proxy()); 498 port->set_send_retransmit_count_attribute((allocator_->flags() & 499 PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0); 500 501 PortData data(port, seq); 502 ports_.push_back(data); 503 504 port->SignalCandidateReady.connect( 505 this, &BasicPortAllocatorSession::OnCandidateReady); 506 port->SignalPortComplete.connect(this, 507 &BasicPortAllocatorSession::OnPortComplete); 508 port->SignalDestroyed.connect(this, 509 &BasicPortAllocatorSession::OnPortDestroyed); 510 port->SignalPortError.connect( 511 this, &BasicPortAllocatorSession::OnPortError); 512 LOG_J(LS_INFO, port) << "Added port to allocator"; 513 514 if (prepare_address) 515 port->PrepareAddress(); 516 } 517 518 void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() { 519 allocation_sequences_created_ = true; 520 // Send candidate allocation complete signal if we have no sequences. 521 MaybeSignalCandidatesAllocationDone(); 522 } 523 524 void BasicPortAllocatorSession::OnCandidateReady( 525 Port* port, const Candidate& c) { 526 ASSERT(talk_base::Thread::Current() == network_thread_); 527 PortData* data = FindPort(port); 528 ASSERT(data != NULL); 529 // Discarding any candidate signal if port allocation status is 530 // already in completed state. 531 if (data->complete()) 532 return; 533 534 // Send candidates whose protocol is enabled. 535 std::vector<Candidate> candidates; 536 ProtocolType pvalue; 537 if (StringToProto(c.protocol().c_str(), &pvalue) && 538 data->sequence()->ProtocolEnabled(pvalue)) { 539 candidates.push_back(c); 540 } 541 542 if (!candidates.empty()) { 543 SignalCandidatesReady(this, candidates); 544 } 545 546 // Moving to READY state as we have atleast one candidate from the port. 547 // Since this port has atleast one candidate we should forward this port 548 // to listners, to allow connections from this port. 549 if (!data->ready()) { 550 data->set_ready(); 551 SignalPortReady(this, port); 552 } 553 } 554 555 void BasicPortAllocatorSession::OnPortComplete(Port* port) { 556 ASSERT(talk_base::Thread::Current() == network_thread_); 557 PortData* data = FindPort(port); 558 ASSERT(data != NULL); 559 560 // Ignore any late signals. 561 if (data->complete()) 562 return; 563 564 // Moving to COMPLETE state. 565 data->set_complete(); 566 // Send candidate allocation complete signal if this was the last port. 567 MaybeSignalCandidatesAllocationDone(); 568 } 569 570 void BasicPortAllocatorSession::OnPortError(Port* port) { 571 ASSERT(talk_base::Thread::Current() == network_thread_); 572 PortData* data = FindPort(port); 573 ASSERT(data != NULL); 574 // We might have already given up on this port and stopped it. 575 if (data->complete()) 576 return; 577 578 // SignalAddressError is currently sent from StunPort/TurnPort. 579 // But this signal itself is generic. 580 data->set_error(); 581 // Send candidate allocation complete signal if this was the last port. 582 MaybeSignalCandidatesAllocationDone(); 583 } 584 585 void BasicPortAllocatorSession::OnProtocolEnabled(AllocationSequence* seq, 586 ProtocolType proto) { 587 std::vector<Candidate> candidates; 588 for (std::vector<PortData>::iterator it = ports_.begin(); 589 it != ports_.end(); ++it) { 590 if (it->sequence() != seq) 591 continue; 592 593 const std::vector<Candidate>& potentials = it->port()->Candidates(); 594 for (size_t i = 0; i < potentials.size(); ++i) { 595 ProtocolType pvalue; 596 if (!StringToProto(potentials[i].protocol().c_str(), &pvalue)) 597 continue; 598 if (pvalue == proto) { 599 candidates.push_back(potentials[i]); 600 } 601 } 602 } 603 604 if (!candidates.empty()) { 605 SignalCandidatesReady(this, candidates); 606 } 607 } 608 609 void BasicPortAllocatorSession::OnPortAllocationComplete( 610 AllocationSequence* seq) { 611 // Send candidate allocation complete signal if all ports are done. 612 MaybeSignalCandidatesAllocationDone(); 613 } 614 615 void BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone() { 616 // Send signal only if all required AllocationSequence objects 617 // are created. 618 if (!allocation_sequences_created_) 619 return; 620 621 // Check that all port allocation sequences are complete. 622 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin(); 623 it != sequences_.end(); ++it) { 624 if ((*it)->state() == AllocationSequence::kRunning) 625 return; 626 } 627 628 // If all allocated ports are in complete state, session must have got all 629 // expected candidates. Session will trigger candidates allocation complete 630 // signal. 631 for (std::vector<PortData>::iterator it = ports_.begin(); 632 it != ports_.end(); ++it) { 633 if (!it->complete()) 634 return; 635 } 636 LOG(LS_INFO) << "All candidates gathered for " << content_name_ << ":" 637 << component_ << ":" << generation(); 638 SignalCandidatesAllocationDone(this); 639 } 640 641 void BasicPortAllocatorSession::OnPortDestroyed( 642 PortInterface* port) { 643 ASSERT(talk_base::Thread::Current() == network_thread_); 644 for (std::vector<PortData>::iterator iter = ports_.begin(); 645 iter != ports_.end(); ++iter) { 646 if (port == iter->port()) { 647 ports_.erase(iter); 648 LOG_J(LS_INFO, port) << "Removed port from allocator (" 649 << static_cast<int>(ports_.size()) << " remaining)"; 650 return; 651 } 652 } 653 ASSERT(false); 654 } 655 656 void BasicPortAllocatorSession::OnShake() { 657 LOG(INFO) << ">>>>> SHAKE <<<<< >>>>> SHAKE <<<<< >>>>> SHAKE <<<<<"; 658 659 std::vector<Port*> ports; 660 std::vector<Connection*> connections; 661 662 for (size_t i = 0; i < ports_.size(); ++i) { 663 if (ports_[i].ready()) 664 ports.push_back(ports_[i].port()); 665 } 666 667 for (size_t i = 0; i < ports.size(); ++i) { 668 Port::AddressMap::const_iterator iter; 669 for (iter = ports[i]->connections().begin(); 670 iter != ports[i]->connections().end(); 671 ++iter) { 672 connections.push_back(iter->second); 673 } 674 } 675 676 LOG(INFO) << ">>>>> Destroying " << ports.size() << " ports and " 677 << connections.size() << " connections"; 678 679 for (size_t i = 0; i < connections.size(); ++i) 680 connections[i]->Destroy(); 681 682 if (running_ || (ports.size() > 0) || (connections.size() > 0)) 683 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); 684 } 685 686 BasicPortAllocatorSession::PortData* BasicPortAllocatorSession::FindPort( 687 Port* port) { 688 for (std::vector<PortData>::iterator it = ports_.begin(); 689 it != ports_.end(); ++it) { 690 if (it->port() == port) { 691 return &*it; 692 } 693 } 694 return NULL; 695 } 696 697 // AllocationSequence 698 699 AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session, 700 talk_base::Network* network, 701 PortConfiguration* config, 702 uint32 flags) 703 : session_(session), 704 network_(network), 705 ip_(network->ip()), 706 config_(config), 707 state_(kInit), 708 flags_(flags), 709 udp_socket_(), 710 udp_port_(NULL), 711 phase_(0) { 712 } 713 714 bool AllocationSequence::Init() { 715 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && 716 !IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_UFRAG)) { 717 LOG(LS_ERROR) << "Shared socket option can't be set without " 718 << "shared ufrag."; 719 ASSERT(false); 720 return false; 721 } 722 723 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { 724 udp_socket_.reset(session_->socket_factory()->CreateUdpSocket( 725 talk_base::SocketAddress(ip_, 0), session_->allocator()->min_port(), 726 session_->allocator()->max_port())); 727 if (udp_socket_) { 728 udp_socket_->SignalReadPacket.connect( 729 this, &AllocationSequence::OnReadPacket); 730 } 731 // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP 732 // are next available options to setup a communication channel. 733 } 734 return true; 735 } 736 737 void AllocationSequence::Clear() { 738 udp_port_ = NULL; 739 turn_ports_.clear(); 740 } 741 742 AllocationSequence::~AllocationSequence() { 743 session_->network_thread()->Clear(this); 744 } 745 746 void AllocationSequence::DisableEquivalentPhases(talk_base::Network* network, 747 PortConfiguration* config, uint32* flags) { 748 if (!((network == network_) && (ip_ == network->ip()))) { 749 // Different network setup; nothing is equivalent. 750 return; 751 } 752 753 // Else turn off the stuff that we've already got covered. 754 755 // Every config implicitly specifies local, so turn that off right away. 756 *flags |= PORTALLOCATOR_DISABLE_UDP; 757 *flags |= PORTALLOCATOR_DISABLE_TCP; 758 759 if (config_ && config) { 760 if (config_->stun_address == config->stun_address) { 761 // Already got this STUN server covered. 762 *flags |= PORTALLOCATOR_DISABLE_STUN; 763 } 764 if (!config_->relays.empty()) { 765 // Already got relays covered. 766 // NOTE: This will even skip a _different_ set of relay servers if we 767 // were to be given one, but that never happens in our codebase. Should 768 // probably get rid of the list in PortConfiguration and just keep a 769 // single relay server in each one. 770 *flags |= PORTALLOCATOR_DISABLE_RELAY; 771 } 772 } 773 } 774 775 void AllocationSequence::Start() { 776 state_ = kRunning; 777 session_->network_thread()->Post(this, MSG_ALLOCATION_PHASE); 778 } 779 780 void AllocationSequence::Stop() { 781 // If the port is completed, don't set it to stopped. 782 if (state_ == kRunning) { 783 state_ = kStopped; 784 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); 785 } 786 } 787 788 void AllocationSequence::OnMessage(talk_base::Message* msg) { 789 ASSERT(talk_base::Thread::Current() == session_->network_thread()); 790 ASSERT(msg->message_id == MSG_ALLOCATION_PHASE); 791 792 const char* const PHASE_NAMES[kNumPhases] = { 793 "Udp", "Relay", "Tcp", "SslTcp" 794 }; 795 796 // Perform all of the phases in the current step. 797 LOG_J(LS_INFO, network_) << "Allocation Phase=" 798 << PHASE_NAMES[phase_]; 799 800 switch (phase_) { 801 case PHASE_UDP: 802 CreateUDPPorts(); 803 CreateStunPorts(); 804 EnableProtocol(PROTO_UDP); 805 break; 806 807 case PHASE_RELAY: 808 CreateRelayPorts(); 809 break; 810 811 case PHASE_TCP: 812 CreateTCPPorts(); 813 EnableProtocol(PROTO_TCP); 814 break; 815 816 case PHASE_SSLTCP: 817 state_ = kCompleted; 818 EnableProtocol(PROTO_SSLTCP); 819 break; 820 821 default: 822 ASSERT(false); 823 } 824 825 if (state() == kRunning) { 826 ++phase_; 827 session_->network_thread()->PostDelayed( 828 session_->allocator()->step_delay(), 829 this, MSG_ALLOCATION_PHASE); 830 } else { 831 // If all phases in AllocationSequence are completed, no allocation 832 // steps needed further. Canceling pending signal. 833 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); 834 SignalPortAllocationComplete(this); 835 } 836 } 837 838 void AllocationSequence::EnableProtocol(ProtocolType proto) { 839 if (!ProtocolEnabled(proto)) { 840 protocols_.push_back(proto); 841 session_->OnProtocolEnabled(this, proto); 842 } 843 } 844 845 bool AllocationSequence::ProtocolEnabled(ProtocolType proto) const { 846 for (ProtocolList::const_iterator it = protocols_.begin(); 847 it != protocols_.end(); ++it) { 848 if (*it == proto) 849 return true; 850 } 851 return false; 852 } 853 854 void AllocationSequence::CreateUDPPorts() { 855 if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) { 856 LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping."; 857 return; 858 } 859 860 // TODO(mallinath) - Remove UDPPort creating socket after shared socket 861 // is enabled completely. 862 UDPPort* port = NULL; 863 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) { 864 port = UDPPort::Create(session_->network_thread(), 865 session_->socket_factory(), network_, 866 udp_socket_.get(), 867 session_->username(), session_->password()); 868 } else { 869 port = UDPPort::Create(session_->network_thread(), 870 session_->socket_factory(), 871 network_, ip_, 872 session_->allocator()->min_port(), 873 session_->allocator()->max_port(), 874 session_->username(), session_->password()); 875 } 876 877 if (port) { 878 // If shared socket is enabled, STUN candidate will be allocated by the 879 // UDPPort. 880 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { 881 udp_port_ = port; 882 883 // If STUN is not disabled, setting stun server address to port. 884 if (!IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) { 885 // If config has stun_address, use it to get server reflexive candidate 886 // otherwise use first TURN server which supports UDP. 887 if (config_ && !config_->stun_address.IsNil()) { 888 LOG(LS_INFO) << "AllocationSequence: UDPPort will be handling the " 889 << "STUN candidate generation."; 890 port->set_server_addr(config_->stun_address); 891 } else if (config_ && 892 config_->SupportsProtocol(RELAY_TURN, PROTO_UDP)) { 893 port->set_server_addr(config_->GetFirstRelayServerAddress( 894 RELAY_TURN, PROTO_UDP)); 895 LOG(LS_INFO) << "AllocationSequence: TURN Server address will be " 896 << " used for generating STUN candidate."; 897 } 898 } 899 } 900 901 session_->AddAllocatedPort(port, this, true); 902 port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed); 903 } 904 } 905 906 void AllocationSequence::CreateTCPPorts() { 907 if (IsFlagSet(PORTALLOCATOR_DISABLE_TCP)) { 908 LOG(LS_VERBOSE) << "AllocationSequence: TCP ports disabled, skipping."; 909 return; 910 } 911 912 Port* port = TCPPort::Create(session_->network_thread(), 913 session_->socket_factory(), 914 network_, ip_, 915 session_->allocator()->min_port(), 916 session_->allocator()->max_port(), 917 session_->username(), session_->password(), 918 session_->allocator()->allow_tcp_listen()); 919 if (port) { 920 session_->AddAllocatedPort(port, this, true); 921 // Since TCPPort is not created using shared socket, |port| will not be 922 // added to the dequeue. 923 } 924 } 925 926 void AllocationSequence::CreateStunPorts() { 927 if (IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) { 928 LOG(LS_VERBOSE) << "AllocationSequence: STUN ports disabled, skipping."; 929 return; 930 } 931 932 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { 933 return; 934 } 935 936 // If BasicPortAllocatorSession::OnAllocate left STUN ports enabled then we 937 // ought to have an address for them here. 938 ASSERT(config_ && !config_->stun_address.IsNil()); 939 if (!(config_ && !config_->stun_address.IsNil())) { 940 LOG(LS_WARNING) 941 << "AllocationSequence: No STUN server configured, skipping."; 942 return; 943 } 944 945 StunPort* port = StunPort::Create(session_->network_thread(), 946 session_->socket_factory(), 947 network_, ip_, 948 session_->allocator()->min_port(), 949 session_->allocator()->max_port(), 950 session_->username(), session_->password(), 951 config_->stun_address); 952 if (port) { 953 session_->AddAllocatedPort(port, this, true); 954 // Since StunPort is not created using shared socket, |port| will not be 955 // added to the dequeue. 956 } 957 } 958 959 void AllocationSequence::CreateRelayPorts() { 960 if (IsFlagSet(PORTALLOCATOR_DISABLE_RELAY)) { 961 LOG(LS_VERBOSE) << "AllocationSequence: Relay ports disabled, skipping."; 962 return; 963 } 964 965 // If BasicPortAllocatorSession::OnAllocate left relay ports enabled then we 966 // ought to have a relay list for them here. 967 ASSERT(config_ && !config_->relays.empty()); 968 if (!(config_ && !config_->relays.empty())) { 969 LOG(LS_WARNING) 970 << "AllocationSequence: No relay server configured, skipping."; 971 return; 972 } 973 974 PortConfiguration::RelayList::const_iterator relay; 975 for (relay = config_->relays.begin(); 976 relay != config_->relays.end(); ++relay) { 977 if (relay->type == RELAY_GTURN) { 978 CreateGturnPort(*relay); 979 } else if (relay->type == RELAY_TURN) { 980 CreateTurnPort(*relay); 981 } else { 982 ASSERT(false); 983 } 984 } 985 } 986 987 void AllocationSequence::CreateGturnPort(const RelayServerConfig& config) { 988 // TODO(mallinath) - Rename RelayPort to GTurnPort. 989 RelayPort* port = RelayPort::Create(session_->network_thread(), 990 session_->socket_factory(), 991 network_, ip_, 992 session_->allocator()->min_port(), 993 session_->allocator()->max_port(), 994 config_->username, config_->password); 995 if (port) { 996 // Since RelayPort is not created using shared socket, |port| will not be 997 // added to the dequeue. 998 // Note: We must add the allocated port before we add addresses because 999 // the latter will create candidates that need name and preference 1000 // settings. However, we also can't prepare the address (normally 1001 // done by AddAllocatedPort) until we have these addresses. So we 1002 // wait to do that until below. 1003 session_->AddAllocatedPort(port, this, false); 1004 1005 // Add the addresses of this protocol. 1006 PortList::const_iterator relay_port; 1007 for (relay_port = config.ports.begin(); 1008 relay_port != config.ports.end(); 1009 ++relay_port) { 1010 port->AddServerAddress(*relay_port); 1011 port->AddExternalAddress(*relay_port); 1012 } 1013 // Start fetching an address for this port. 1014 port->PrepareAddress(); 1015 } 1016 } 1017 1018 void AllocationSequence::CreateTurnPort(const RelayServerConfig& config) { 1019 PortList::const_iterator relay_port; 1020 for (relay_port = config.ports.begin(); 1021 relay_port != config.ports.end(); ++relay_port) { 1022 TurnPort* port = NULL; 1023 // Shared socket mode must be enabled only for UDP based ports. Hence 1024 // don't pass shared socket for ports which will create TCP sockets. 1025 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && 1026 relay_port->proto == PROTO_UDP) { 1027 port = TurnPort::Create(session_->network_thread(), 1028 session_->socket_factory(), 1029 network_, udp_socket_.get(), 1030 session_->username(), session_->password(), 1031 *relay_port, config.credentials); 1032 // If we are using shared socket for TURN and udp ports, we need to 1033 // find a way to demux the packets to the correct port when received. 1034 // Mapping against server_address is one way of doing this. When packet 1035 // is received the remote_address will be checked against the map. 1036 // If server address is not resolved, a signal will be sent from the port 1037 // after the address is resolved. The map entry will updated with the 1038 // resolved address when the signal is received from the port. 1039 if ((*relay_port).address.IsUnresolved()) { 1040 // If server address is not resolved then listen for signal from port. 1041 port->SignalResolvedServerAddress.connect( 1042 this, &AllocationSequence::OnResolvedTurnServerAddress); 1043 } 1044 turn_ports_[(*relay_port).address] = port; 1045 // Listen to the port destroyed signal, to allow AllocationSequence to 1046 // remove entrt from it's map. 1047 port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed); 1048 } else { 1049 port = TurnPort::Create(session_->network_thread(), 1050 session_->socket_factory(), 1051 network_, ip_, 1052 session_->allocator()->min_port(), 1053 session_->allocator()->max_port(), 1054 session_->username(), 1055 session_->password(), 1056 *relay_port, config.credentials); 1057 } 1058 ASSERT(port != NULL); 1059 session_->AddAllocatedPort(port, this, true); 1060 } 1061 } 1062 1063 void AllocationSequence::OnReadPacket( 1064 talk_base::AsyncPacketSocket* socket, const char* data, size_t size, 1065 const talk_base::SocketAddress& remote_addr, 1066 const talk_base::PacketTime& packet_time) { 1067 ASSERT(socket == udp_socket_.get()); 1068 // If the packet is received from one of the TURN server in the config, then 1069 // pass down the packet to that port, otherwise it will be handed down to 1070 // the local udp port. 1071 Port* port = NULL; 1072 std::map<talk_base::SocketAddress, Port*>::iterator iter = 1073 turn_ports_.find(remote_addr); 1074 if (iter != turn_ports_.end()) { 1075 port = iter->second; 1076 } else if (udp_port_) { 1077 port = udp_port_; 1078 } 1079 ASSERT(port != NULL); 1080 if (port) { 1081 port->HandleIncomingPacket(socket, data, size, remote_addr, packet_time); 1082 } 1083 } 1084 1085 void AllocationSequence::OnPortDestroyed(PortInterface* port) { 1086 if (udp_port_ == port) { 1087 udp_port_ = NULL; 1088 } else { 1089 std::map<talk_base::SocketAddress, Port*>::iterator iter; 1090 for (iter = turn_ports_.begin(); iter != turn_ports_.end(); ++iter) { 1091 if (iter->second == port) { 1092 turn_ports_.erase(iter); 1093 break; 1094 } 1095 } 1096 } 1097 } 1098 1099 void AllocationSequence::OnResolvedTurnServerAddress( 1100 TurnPort* port, const talk_base::SocketAddress& server_address, 1101 const talk_base::SocketAddress& resolved_server_address) { 1102 std::map<talk_base::SocketAddress, Port*>::iterator iter; 1103 iter = turn_ports_.find(server_address); 1104 if (iter == turn_ports_.end()) { 1105 LOG(LS_INFO) << "TurnPort entry is not found in the map."; 1106 return; 1107 } 1108 1109 ASSERT(iter->second == port); 1110 // Remove old entry and then insert using the resolved address as key. 1111 turn_ports_.erase(iter); 1112 turn_ports_[resolved_server_address] = port; 1113 } 1114 1115 // PortConfiguration 1116 PortConfiguration::PortConfiguration( 1117 const talk_base::SocketAddress& stun_address, 1118 const std::string& username, 1119 const std::string& password) 1120 : stun_address(stun_address), 1121 username(username), 1122 password(password) { 1123 } 1124 1125 void PortConfiguration::AddRelay(const RelayServerConfig& config) { 1126 relays.push_back(config); 1127 } 1128 1129 bool PortConfiguration::SupportsProtocol( 1130 const RelayServerConfig& relay, ProtocolType type) const { 1131 PortList::const_iterator relay_port; 1132 for (relay_port = relay.ports.begin(); 1133 relay_port != relay.ports.end(); 1134 ++relay_port) { 1135 if (relay_port->proto == type) 1136 return true; 1137 } 1138 return false; 1139 } 1140 1141 bool PortConfiguration::SupportsProtocol(RelayType turn_type, 1142 ProtocolType type) const { 1143 for (size_t i = 0; i < relays.size(); ++i) { 1144 if (relays[i].type == turn_type && 1145 SupportsProtocol(relays[i], type)) 1146 return true; 1147 } 1148 return false; 1149 } 1150 1151 talk_base::SocketAddress PortConfiguration::GetFirstRelayServerAddress( 1152 RelayType turn_type, ProtocolType type) const { 1153 for (size_t i = 0; i < relays.size(); ++i) { 1154 if (relays[i].type == turn_type && SupportsProtocol(relays[i], type)) { 1155 return relays[i].ports.front().address; 1156 } 1157 } 1158 return talk_base::SocketAddress(); 1159 } 1160 1161 } // namespace cricket 1162