1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/session.h" 29 30 #include "talk/base/bind.h" 31 #include "talk/base/common.h" 32 #include "talk/base/logging.h" 33 #include "talk/base/helpers.h" 34 #include "talk/base/scoped_ptr.h" 35 #include "talk/base/sslstreamadapter.h" 36 #include "talk/xmpp/constants.h" 37 #include "talk/xmpp/jid.h" 38 #include "talk/p2p/base/dtlstransport.h" 39 #include "talk/p2p/base/p2ptransport.h" 40 #include "talk/p2p/base/sessionclient.h" 41 #include "talk/p2p/base/transport.h" 42 #include "talk/p2p/base/transportchannelproxy.h" 43 #include "talk/p2p/base/transportinfo.h" 44 45 #include "talk/p2p/base/constants.h" 46 47 namespace cricket { 48 49 using talk_base::Bind; 50 51 bool BadMessage(const buzz::QName type, 52 const std::string& text, 53 MessageError* err) { 54 err->SetType(type); 55 err->SetText(text); 56 return false; 57 } 58 59 TransportProxy::~TransportProxy() { 60 for (ChannelMap::iterator iter = channels_.begin(); 61 iter != channels_.end(); ++iter) { 62 iter->second->SignalDestroyed(iter->second); 63 delete iter->second; 64 } 65 } 66 67 std::string TransportProxy::type() const { 68 return transport_->get()->type(); 69 } 70 71 TransportChannel* TransportProxy::GetChannel(int component) { 72 ASSERT(talk_base::Thread::Current() == worker_thread_); 73 return GetChannelProxy(component); 74 } 75 76 TransportChannel* TransportProxy::CreateChannel( 77 const std::string& name, int component) { 78 ASSERT(talk_base::Thread::Current() == worker_thread_); 79 ASSERT(GetChannel(component) == NULL); 80 ASSERT(!transport_->get()->HasChannel(component)); 81 82 // We always create a proxy in case we need to change out the transport later. 83 TransportChannelProxy* channel = 84 new TransportChannelProxy(content_name(), name, component); 85 channels_[component] = channel; 86 87 // If we're already negotiated, create an impl and hook it up to the proxy 88 // channel. If we're connecting, create an impl but don't hook it up yet. 89 if (negotiated_) { 90 SetupChannelProxy_w(component, channel); 91 } else if (connecting_) { 92 GetOrCreateChannelProxyImpl_w(component); 93 } 94 return channel; 95 } 96 97 bool TransportProxy::HasChannel(int component) { 98 return transport_->get()->HasChannel(component); 99 } 100 101 void TransportProxy::DestroyChannel(int component) { 102 ASSERT(talk_base::Thread::Current() == worker_thread_); 103 TransportChannel* channel = GetChannel(component); 104 if (channel) { 105 // If the state of TransportProxy is not NEGOTIATED 106 // then TransportChannelProxy and its impl are not 107 // connected. Both must be connected before 108 // deletion. 109 if (!negotiated_) { 110 SetupChannelProxy_w(component, GetChannelProxy(component)); 111 } 112 113 channels_.erase(component); 114 channel->SignalDestroyed(channel); 115 delete channel; 116 } 117 } 118 119 void TransportProxy::ConnectChannels() { 120 if (!connecting_) { 121 if (!negotiated_) { 122 for (ChannelMap::iterator iter = channels_.begin(); 123 iter != channels_.end(); ++iter) { 124 GetOrCreateChannelProxyImpl(iter->first); 125 } 126 } 127 connecting_ = true; 128 } 129 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we 130 // don't have any channels yet, so we need to allow this method to be called 131 // multiple times. Once we fix Transport, we can move this call inside the 132 // if (!connecting_) block. 133 transport_->get()->ConnectChannels(); 134 } 135 136 void TransportProxy::CompleteNegotiation() { 137 if (!negotiated_) { 138 for (ChannelMap::iterator iter = channels_.begin(); 139 iter != channels_.end(); ++iter) { 140 SetupChannelProxy(iter->first, iter->second); 141 } 142 negotiated_ = true; 143 } 144 } 145 146 void TransportProxy::AddSentCandidates(const Candidates& candidates) { 147 for (Candidates::const_iterator cand = candidates.begin(); 148 cand != candidates.end(); ++cand) { 149 sent_candidates_.push_back(*cand); 150 } 151 } 152 153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) { 154 for (Candidates::const_iterator cand = candidates.begin(); 155 cand != candidates.end(); ++cand) { 156 unsent_candidates_.push_back(*cand); 157 } 158 } 159 160 bool TransportProxy::GetChannelNameFromComponent( 161 int component, std::string* channel_name) const { 162 const TransportChannelProxy* channel = GetChannelProxy(component); 163 if (channel == NULL) { 164 return false; 165 } 166 167 *channel_name = channel->name(); 168 return true; 169 } 170 171 bool TransportProxy::GetComponentFromChannelName( 172 const std::string& channel_name, int* component) const { 173 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name); 174 if (channel == NULL) { 175 return false; 176 } 177 178 *component = channel->component(); 179 return true; 180 } 181 182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const { 183 ChannelMap::const_iterator iter = channels_.find(component); 184 return (iter != channels_.end()) ? iter->second : NULL; 185 } 186 187 TransportChannelProxy* TransportProxy::GetChannelProxyByName( 188 const std::string& name) const { 189 for (ChannelMap::const_iterator iter = channels_.begin(); 190 iter != channels_.end(); 191 ++iter) { 192 if (iter->second->name() == name) { 193 return iter->second; 194 } 195 } 196 return NULL; 197 } 198 199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl( 200 int component) { 201 return worker_thread_->Invoke<TransportChannelImpl*>(Bind( 202 &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component)); 203 } 204 205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w( 206 int component) { 207 ASSERT(talk_base::Thread::Current() == worker_thread_); 208 TransportChannelImpl* impl = transport_->get()->GetChannel(component); 209 if (impl == NULL) { 210 impl = transport_->get()->CreateChannel(component); 211 } 212 return impl; 213 } 214 215 void TransportProxy::SetupChannelProxy( 216 int component, TransportChannelProxy* transproxy) { 217 worker_thread_->Invoke<void>(Bind( 218 &TransportProxy::SetupChannelProxy_w, this, component, transproxy)); 219 } 220 221 void TransportProxy::SetupChannelProxy_w( 222 int component, TransportChannelProxy* transproxy) { 223 ASSERT(talk_base::Thread::Current() == worker_thread_); 224 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component); 225 ASSERT(impl != NULL); 226 transproxy->SetImplementation(impl); 227 } 228 229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy, 230 TransportChannelImpl* impl) { 231 worker_thread_->Invoke<void>(Bind( 232 &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl)); 233 } 234 235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy, 236 TransportChannelImpl* impl) { 237 ASSERT(talk_base::Thread::Current() == worker_thread_); 238 ASSERT(proxy != NULL); 239 proxy->SetImplementation(impl); 240 } 241 242 // This function muxes |this| onto |target| by repointing |this| at 243 // |target|'s transport and setting our TransportChannelProxies 244 // to point to |target|'s underlying implementations. 245 bool TransportProxy::SetupMux(TransportProxy* target) { 246 // Bail out if there's nothing to do. 247 if (transport_ == target->transport_) { 248 return true; 249 } 250 251 // Run through all channels and remove any non-rtp transport channels before 252 // setting target transport channels. 253 for (ChannelMap::const_iterator iter = channels_.begin(); 254 iter != channels_.end(); ++iter) { 255 if (!target->transport_->get()->HasChannel(iter->first)) { 256 // Remove if channel doesn't exist in |transport_|. 257 ReplaceChannelProxyImpl(iter->second, NULL); 258 } else { 259 // Replace the impl for all the TransportProxyChannels with the channels 260 // from |target|'s transport. Fail if there's not an exact match. 261 ReplaceChannelProxyImpl( 262 iter->second, target->transport_->get()->CreateChannel(iter->first)); 263 } 264 } 265 266 // Now replace our transport. Must happen afterwards because 267 // it deletes all impls as a side effect. 268 transport_ = target->transport_; 269 transport_->get()->SignalCandidatesReady.connect( 270 this, &TransportProxy::OnTransportCandidatesReady); 271 set_candidates_allocated(target->candidates_allocated()); 272 return true; 273 } 274 275 void TransportProxy::SetIceRole(IceRole role) { 276 transport_->get()->SetIceRole(role); 277 } 278 279 bool TransportProxy::SetLocalTransportDescription( 280 const TransportDescription& description, 281 ContentAction action, 282 std::string* error_desc) { 283 // If this is an answer, finalize the negotiation. 284 if (action == CA_ANSWER) { 285 CompleteNegotiation(); 286 } 287 bool result = transport_->get()->SetLocalTransportDescription(description, 288 action, 289 error_desc); 290 if (result) 291 local_description_set_ = true; 292 return result; 293 } 294 295 bool TransportProxy::SetRemoteTransportDescription( 296 const TransportDescription& description, 297 ContentAction action, 298 std::string* error_desc) { 299 // If this is an answer, finalize the negotiation. 300 if (action == CA_ANSWER) { 301 CompleteNegotiation(); 302 } 303 bool result = transport_->get()->SetRemoteTransportDescription(description, 304 action, 305 error_desc); 306 if (result) 307 remote_description_set_ = true; 308 return result; 309 } 310 311 void TransportProxy::OnSignalingReady() { 312 // If we're starting a new allocation sequence, reset our state. 313 set_candidates_allocated(false); 314 transport_->get()->OnSignalingReady(); 315 } 316 317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates, 318 std::string* error) { 319 // Ensure the transport is negotiated before handling candidates. 320 // TODO(juberti): Remove this once everybody calls SetLocalTD. 321 CompleteNegotiation(); 322 323 // Verify each candidate before passing down to transport layer. 324 for (Candidates::const_iterator cand = candidates.begin(); 325 cand != candidates.end(); ++cand) { 326 if (!transport_->get()->VerifyCandidate(*cand, error)) 327 return false; 328 if (!HasChannel(cand->component())) { 329 *error = "Candidate has unknown component: " + cand->ToString() + 330 " for content: " + content_name_; 331 return false; 332 } 333 } 334 transport_->get()->OnRemoteCandidates(candidates); 335 return true; 336 } 337 338 void TransportProxy::SetIdentity( 339 talk_base::SSLIdentity* identity) { 340 transport_->get()->SetIdentity(identity); 341 } 342 343 std::string BaseSession::StateToString(State state) { 344 switch (state) { 345 case Session::STATE_INIT: 346 return "STATE_INIT"; 347 case Session::STATE_SENTINITIATE: 348 return "STATE_SENTINITIATE"; 349 case Session::STATE_RECEIVEDINITIATE: 350 return "STATE_RECEIVEDINITIATE"; 351 case Session::STATE_SENTPRACCEPT: 352 return "STATE_SENTPRACCEPT"; 353 case Session::STATE_SENTACCEPT: 354 return "STATE_SENTACCEPT"; 355 case Session::STATE_RECEIVEDPRACCEPT: 356 return "STATE_RECEIVEDPRACCEPT"; 357 case Session::STATE_RECEIVEDACCEPT: 358 return "STATE_RECEIVEDACCEPT"; 359 case Session::STATE_SENTMODIFY: 360 return "STATE_SENTMODIFY"; 361 case Session::STATE_RECEIVEDMODIFY: 362 return "STATE_RECEIVEDMODIFY"; 363 case Session::STATE_SENTREJECT: 364 return "STATE_SENTREJECT"; 365 case Session::STATE_RECEIVEDREJECT: 366 return "STATE_RECEIVEDREJECT"; 367 case Session::STATE_SENTREDIRECT: 368 return "STATE_SENTREDIRECT"; 369 case Session::STATE_SENTTERMINATE: 370 return "STATE_SENTTERMINATE"; 371 case Session::STATE_RECEIVEDTERMINATE: 372 return "STATE_RECEIVEDTERMINATE"; 373 case Session::STATE_INPROGRESS: 374 return "STATE_INPROGRESS"; 375 case Session::STATE_DEINIT: 376 return "STATE_DEINIT"; 377 default: 378 break; 379 } 380 return "STATE_" + talk_base::ToString(state); 381 } 382 383 BaseSession::BaseSession(talk_base::Thread* signaling_thread, 384 talk_base::Thread* worker_thread, 385 PortAllocator* port_allocator, 386 const std::string& sid, 387 const std::string& content_type, 388 bool initiator) 389 : state_(STATE_INIT), 390 error_(ERROR_NONE), 391 signaling_thread_(signaling_thread), 392 worker_thread_(worker_thread), 393 port_allocator_(port_allocator), 394 sid_(sid), 395 content_type_(content_type), 396 transport_type_(NS_GINGLE_P2P), 397 initiator_(initiator), 398 identity_(NULL), 399 local_description_(NULL), 400 remote_description_(NULL), 401 ice_tiebreaker_(talk_base::CreateRandomId64()), 402 role_switch_(false) { 403 ASSERT(signaling_thread->IsCurrent()); 404 } 405 406 BaseSession::~BaseSession() { 407 ASSERT(signaling_thread()->IsCurrent()); 408 409 ASSERT(state_ != STATE_DEINIT); 410 LogState(state_, STATE_DEINIT); 411 state_ = STATE_DEINIT; 412 SignalState(this, state_); 413 414 for (TransportMap::iterator iter = transports_.begin(); 415 iter != transports_.end(); ++iter) { 416 delete iter->second; 417 } 418 419 delete remote_description_; 420 delete local_description_; 421 } 422 423 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) { 424 if (identity_) 425 return false; 426 identity_ = identity; 427 for (TransportMap::iterator iter = transports_.begin(); 428 iter != transports_.end(); ++iter) { 429 iter->second->SetIdentity(identity_); 430 } 431 return true; 432 } 433 434 bool BaseSession::PushdownTransportDescription(ContentSource source, 435 ContentAction action, 436 std::string* error_desc) { 437 if (source == CS_LOCAL) { 438 return PushdownLocalTransportDescription(local_description_, 439 action, 440 error_desc); 441 } 442 return PushdownRemoteTransportDescription(remote_description_, 443 action, 444 error_desc); 445 } 446 447 bool BaseSession::PushdownLocalTransportDescription( 448 const SessionDescription* sdesc, 449 ContentAction action, 450 std::string* error_desc) { 451 // Update the Transports with the right information, and trigger them to 452 // start connecting. 453 for (TransportMap::iterator iter = transports_.begin(); 454 iter != transports_.end(); ++iter) { 455 // If no transport info was in this session description, ret == false 456 // and we just skip this one. 457 TransportDescription tdesc; 458 bool ret = GetTransportDescription( 459 sdesc, iter->second->content_name(), &tdesc); 460 if (ret) { 461 if (!iter->second->SetLocalTransportDescription(tdesc, action, 462 error_desc)) { 463 return false; 464 } 465 466 iter->second->ConnectChannels(); 467 } 468 } 469 470 return true; 471 } 472 473 bool BaseSession::PushdownRemoteTransportDescription( 474 const SessionDescription* sdesc, 475 ContentAction action, 476 std::string* error_desc) { 477 // Update the Transports with the right information. 478 for (TransportMap::iterator iter = transports_.begin(); 479 iter != transports_.end(); ++iter) { 480 TransportDescription tdesc; 481 482 // If no transport info was in this session description, ret == false 483 // and we just skip this one. 484 bool ret = GetTransportDescription( 485 sdesc, iter->second->content_name(), &tdesc); 486 if (ret) { 487 if (!iter->second->SetRemoteTransportDescription(tdesc, action, 488 error_desc)) { 489 return false; 490 } 491 } 492 } 493 494 return true; 495 } 496 497 TransportChannel* BaseSession::CreateChannel(const std::string& content_name, 498 const std::string& channel_name, 499 int component) { 500 // We create the proxy "on demand" here because we need to support 501 // creating channels at any time, even before we send or receive 502 // initiate messages, which is before we create the transports. 503 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name); 504 return transproxy->CreateChannel(channel_name, component); 505 } 506 507 TransportChannel* BaseSession::GetChannel(const std::string& content_name, 508 int component) { 509 TransportProxy* transproxy = GetTransportProxy(content_name); 510 if (transproxy == NULL) 511 return NULL; 512 else 513 return transproxy->GetChannel(component); 514 } 515 516 void BaseSession::DestroyChannel(const std::string& content_name, 517 int component) { 518 TransportProxy* transproxy = GetTransportProxy(content_name); 519 ASSERT(transproxy != NULL); 520 transproxy->DestroyChannel(component); 521 } 522 523 TransportProxy* BaseSession::GetOrCreateTransportProxy( 524 const std::string& content_name) { 525 TransportProxy* transproxy = GetTransportProxy(content_name); 526 if (transproxy) 527 return transproxy; 528 529 Transport* transport = CreateTransport(content_name); 530 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED); 531 transport->SetIceTiebreaker(ice_tiebreaker_); 532 // TODO: Connect all the Transport signals to TransportProxy 533 // then to the BaseSession. 534 transport->SignalConnecting.connect( 535 this, &BaseSession::OnTransportConnecting); 536 transport->SignalWritableState.connect( 537 this, &BaseSession::OnTransportWritable); 538 transport->SignalRequestSignaling.connect( 539 this, &BaseSession::OnTransportRequestSignaling); 540 transport->SignalTransportError.connect( 541 this, &BaseSession::OnTransportSendError); 542 transport->SignalRouteChange.connect( 543 this, &BaseSession::OnTransportRouteChange); 544 transport->SignalCandidatesAllocationDone.connect( 545 this, &BaseSession::OnTransportCandidatesAllocationDone); 546 transport->SignalRoleConflict.connect( 547 this, &BaseSession::OnRoleConflict); 548 transport->SignalCompleted.connect( 549 this, &BaseSession::OnTransportCompleted); 550 transport->SignalFailed.connect( 551 this, &BaseSession::OnTransportFailed); 552 553 transproxy = new TransportProxy(worker_thread_, sid_, content_name, 554 new TransportWrapper(transport)); 555 transproxy->SignalCandidatesReady.connect( 556 this, &BaseSession::OnTransportProxyCandidatesReady); 557 if (identity_) 558 transproxy->SetIdentity(identity_); 559 transports_[content_name] = transproxy; 560 561 return transproxy; 562 } 563 564 Transport* BaseSession::GetTransport(const std::string& content_name) { 565 TransportProxy* transproxy = GetTransportProxy(content_name); 566 if (transproxy == NULL) 567 return NULL; 568 return transproxy->impl(); 569 } 570 571 TransportProxy* BaseSession::GetTransportProxy( 572 const std::string& content_name) { 573 TransportMap::iterator iter = transports_.find(content_name); 574 return (iter != transports_.end()) ? iter->second : NULL; 575 } 576 577 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) { 578 for (TransportMap::iterator iter = transports_.begin(); 579 iter != transports_.end(); ++iter) { 580 TransportProxy* transproxy = iter->second; 581 if (transproxy->impl() == transport) { 582 return transproxy; 583 } 584 } 585 return NULL; 586 } 587 588 TransportProxy* BaseSession::GetFirstTransportProxy() { 589 if (transports_.empty()) 590 return NULL; 591 return transports_.begin()->second; 592 } 593 594 void BaseSession::DestroyTransportProxy( 595 const std::string& content_name) { 596 TransportMap::iterator iter = transports_.find(content_name); 597 if (iter != transports_.end()) { 598 delete iter->second; 599 transports_.erase(content_name); 600 } 601 } 602 603 cricket::Transport* BaseSession::CreateTransport( 604 const std::string& content_name) { 605 ASSERT(transport_type_ == NS_GINGLE_P2P); 606 return new cricket::DtlsTransport<P2PTransport>( 607 signaling_thread(), worker_thread(), content_name, 608 port_allocator(), identity_); 609 } 610 611 bool BaseSession::GetStats(SessionStats* stats) { 612 for (TransportMap::iterator iter = transports_.begin(); 613 iter != transports_.end(); ++iter) { 614 std::string proxy_id = iter->second->content_name(); 615 // We are ignoring not-yet-instantiated transports. 616 if (iter->second->impl()) { 617 std::string transport_id = iter->second->impl()->content_name(); 618 stats->proxy_to_transport[proxy_id] = transport_id; 619 if (stats->transport_stats.find(transport_id) 620 == stats->transport_stats.end()) { 621 TransportStats subinfos; 622 if (!iter->second->impl()->GetStats(&subinfos)) { 623 return false; 624 } 625 stats->transport_stats[transport_id] = subinfos; 626 } 627 } 628 } 629 return true; 630 } 631 632 void BaseSession::SetState(State state) { 633 ASSERT(signaling_thread_->IsCurrent()); 634 if (state != state_) { 635 LogState(state_, state); 636 state_ = state; 637 SignalState(this, state_); 638 signaling_thread_->Post(this, MSG_STATE); 639 } 640 SignalNewDescription(); 641 } 642 643 void BaseSession::SetError(Error error, const std::string& error_desc) { 644 ASSERT(signaling_thread_->IsCurrent()); 645 if (error != error_) { 646 error_ = error; 647 error_desc_ = error_desc; 648 SignalError(this, error); 649 } 650 } 651 652 void BaseSession::OnSignalingReady() { 653 ASSERT(signaling_thread()->IsCurrent()); 654 for (TransportMap::iterator iter = transports_.begin(); 655 iter != transports_.end(); ++iter) { 656 iter->second->OnSignalingReady(); 657 } 658 } 659 660 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to 661 // start, remove this method once everyone calls PushdownLocalTD. 662 void BaseSession::SpeculativelyConnectAllTransportChannels() { 663 // Put all transports into the connecting state. 664 for (TransportMap::iterator iter = transports_.begin(); 665 iter != transports_.end(); ++iter) { 666 iter->second->ConnectChannels(); 667 } 668 } 669 670 bool BaseSession::OnRemoteCandidates(const std::string& content_name, 671 const Candidates& candidates, 672 std::string* error) { 673 // Give candidates to the appropriate transport, and tell that transport 674 // to start connecting, if it's not already doing so. 675 TransportProxy* transproxy = GetTransportProxy(content_name); 676 if (!transproxy) { 677 *error = "Unknown content name " + content_name; 678 return false; 679 } 680 if (!transproxy->OnRemoteCandidates(candidates, error)) { 681 return false; 682 } 683 // TODO(juberti): Remove this call once we can be sure that we always have 684 // a local transport description (which will trigger the connection). 685 transproxy->ConnectChannels(); 686 return true; 687 } 688 689 bool BaseSession::MaybeEnableMuxingSupport() { 690 // We need both a local and remote description to decide if we should mux. 691 if ((state_ == STATE_SENTINITIATE || 692 state_ == STATE_RECEIVEDINITIATE) && 693 ((local_description_ == NULL) || 694 (remote_description_ == NULL))) { 695 return false; 696 } 697 698 // In order to perform the multiplexing, we need all proxies to be in the 699 // negotiated state, i.e. to have implementations underneath. 700 // Ensure that this is the case, regardless of whether we are going to mux. 701 for (TransportMap::iterator iter = transports_.begin(); 702 iter != transports_.end(); ++iter) { 703 ASSERT(iter->second->negotiated()); 704 if (!iter->second->negotiated()) 705 return false; 706 } 707 708 // If both sides agree to BUNDLE, mux all the specified contents onto the 709 // transport belonging to the first content name in the BUNDLE group. 710 // If the contents are already muxed, this will be a no-op. 711 // TODO(juberti): Should this check that local and remote have configured 712 // BUNDLE the same way? 713 bool candidates_allocated = IsCandidateAllocationDone(); 714 const ContentGroup* local_bundle_group = 715 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 716 const ContentGroup* remote_bundle_group = 717 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 718 if (local_bundle_group && remote_bundle_group && 719 local_bundle_group->FirstContentName()) { 720 const std::string* content_name = local_bundle_group->FirstContentName(); 721 const ContentInfo* content = 722 local_description_->GetContentByName(*content_name); 723 ASSERT(content != NULL); 724 if (!SetSelectedProxy(content->name, local_bundle_group)) { 725 LOG(LS_WARNING) << "Failed to set up BUNDLE"; 726 return false; 727 } 728 729 // If we weren't done gathering before, we might be done now, as a result 730 // of enabling mux. 731 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: " 732 << *content_name; 733 if (!candidates_allocated) { 734 MaybeCandidateAllocationDone(); 735 } 736 } else { 737 LOG(LS_INFO) << "No BUNDLE information, not bundling."; 738 } 739 return true; 740 } 741 742 bool BaseSession::SetSelectedProxy(const std::string& content_name, 743 const ContentGroup* muxed_group) { 744 TransportProxy* selected_proxy = GetTransportProxy(content_name); 745 if (!selected_proxy) { 746 return false; 747 } 748 749 ASSERT(selected_proxy->negotiated()); 750 for (TransportMap::iterator iter = transports_.begin(); 751 iter != transports_.end(); ++iter) { 752 // If content is part of the mux group, then repoint its proxy at the 753 // transport object that we have chosen to mux onto. If the proxy 754 // is already pointing at the right object, it will be a no-op. 755 if (muxed_group->HasContentName(iter->first) && 756 !iter->second->SetupMux(selected_proxy)) { 757 return false; 758 } 759 } 760 return true; 761 } 762 763 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) { 764 // TODO(juberti): This is a clunky way of processing the done signal. Instead, 765 // TransportProxy should receive the done signal directly, set its allocated 766 // flag internally, and then reissue the done signal to Session. 767 // Overall we should make TransportProxy receive *all* the signals from 768 // Transport, since this removes the need to manually iterate over all 769 // the transports, as is needed to make sure signals are handled properly 770 // when BUNDLEing. 771 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways 772 // that make it prohibitively difficult to run dbg builds. Disabled for now. 773 //ASSERT(!IsCandidateAllocationDone()); 774 for (TransportMap::iterator iter = transports_.begin(); 775 iter != transports_.end(); ++iter) { 776 if (iter->second->impl() == transport) { 777 iter->second->set_candidates_allocated(true); 778 } 779 } 780 MaybeCandidateAllocationDone(); 781 } 782 783 bool BaseSession::IsCandidateAllocationDone() const { 784 for (TransportMap::const_iterator iter = transports_.begin(); 785 iter != transports_.end(); ++iter) { 786 if (!iter->second->candidates_allocated()) 787 return false; 788 } 789 return true; 790 } 791 792 void BaseSession::MaybeCandidateAllocationDone() { 793 if (IsCandidateAllocationDone()) { 794 LOG(LS_INFO) << "Candidate gathering is complete."; 795 OnCandidatesAllocationDone(); 796 } 797 } 798 799 void BaseSession::OnRoleConflict() { 800 if (role_switch_) { 801 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; 802 return; 803 } 804 805 role_switch_ = true; 806 for (TransportMap::iterator iter = transports_.begin(); 807 iter != transports_.end(); ++iter) { 808 // Role will be reverse of initial role setting. 809 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING; 810 iter->second->SetIceRole(role); 811 } 812 } 813 814 void BaseSession::LogState(State old_state, State new_state) { 815 LOG(LS_INFO) << "Session:" << id() 816 << " Old state:" << StateToString(old_state) 817 << " New state:" << StateToString(new_state) 818 << " Type:" << content_type() 819 << " Transport:" << transport_type(); 820 } 821 822 bool BaseSession::GetTransportDescription(const SessionDescription* description, 823 const std::string& content_name, 824 TransportDescription* tdesc) { 825 if (!description || !tdesc) { 826 return false; 827 } 828 const TransportInfo* transport_info = 829 description->GetTransportInfoByName(content_name); 830 if (!transport_info) { 831 return false; 832 } 833 *tdesc = transport_info->description; 834 return true; 835 } 836 837 void BaseSession::SignalNewDescription() { 838 ContentAction action; 839 ContentSource source; 840 if (!GetContentAction(&action, &source)) { 841 return; 842 } 843 if (source == CS_LOCAL) { 844 SignalNewLocalDescription(this, action); 845 } else { 846 SignalNewRemoteDescription(this, action); 847 } 848 } 849 850 bool BaseSession::GetContentAction(ContentAction* action, 851 ContentSource* source) { 852 switch (state_) { 853 // new local description 854 case STATE_SENTINITIATE: 855 *action = CA_OFFER; 856 *source = CS_LOCAL; 857 break; 858 case STATE_SENTPRACCEPT: 859 *action = CA_PRANSWER; 860 *source = CS_LOCAL; 861 break; 862 case STATE_SENTACCEPT: 863 *action = CA_ANSWER; 864 *source = CS_LOCAL; 865 break; 866 // new remote description 867 case STATE_RECEIVEDINITIATE: 868 *action = CA_OFFER; 869 *source = CS_REMOTE; 870 break; 871 case STATE_RECEIVEDPRACCEPT: 872 *action = CA_PRANSWER; 873 *source = CS_REMOTE; 874 break; 875 case STATE_RECEIVEDACCEPT: 876 *action = CA_ANSWER; 877 *source = CS_REMOTE; 878 break; 879 default: 880 return false; 881 } 882 return true; 883 } 884 885 void BaseSession::OnMessage(talk_base::Message *pmsg) { 886 switch (pmsg->message_id) { 887 case MSG_TIMEOUT: 888 // Session timeout has occured. 889 SetError(ERROR_TIME, "Session timeout has occured."); 890 break; 891 892 case MSG_STATE: 893 switch (state_) { 894 case STATE_SENTACCEPT: 895 case STATE_RECEIVEDACCEPT: 896 SetState(STATE_INPROGRESS); 897 break; 898 899 default: 900 // Explicitly ignoring some states here. 901 break; 902 } 903 break; 904 } 905 } 906 907 Session::Session(SessionManager* session_manager, 908 const std::string& local_name, 909 const std::string& initiator_name, 910 const std::string& sid, 911 const std::string& content_type, 912 SessionClient* client) 913 : BaseSession(session_manager->signaling_thread(), 914 session_manager->worker_thread(), 915 session_manager->port_allocator(), 916 sid, content_type, initiator_name == local_name) { 917 ASSERT(client != NULL); 918 session_manager_ = session_manager; 919 local_name_ = local_name; 920 initiator_name_ = initiator_name; 921 transport_parser_ = new P2PTransportParser(); 922 client_ = client; 923 initiate_acked_ = false; 924 current_protocol_ = PROTOCOL_HYBRID; 925 } 926 927 Session::~Session() { 928 delete transport_parser_; 929 } 930 931 bool Session::Initiate(const std::string &to, 932 const SessionDescription* sdesc) { 933 ASSERT(signaling_thread()->IsCurrent()); 934 SessionError error; 935 936 // Only from STATE_INIT 937 if (state() != STATE_INIT) 938 return false; 939 940 // Setup for signaling. 941 set_remote_name(to); 942 set_local_description(sdesc); 943 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()), 944 &error)) { 945 LOG(LS_ERROR) << "Could not create transports: " << error.text; 946 return false; 947 } 948 949 if (!SendInitiateMessage(sdesc, &error)) { 950 LOG(LS_ERROR) << "Could not send initiate message: " << error.text; 951 return false; 952 } 953 954 // We need to connect transport proxy and impl here so that we can process 955 // the TransportDescriptions. 956 SpeculativelyConnectAllTransportChannels(); 957 958 PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL); 959 SetState(Session::STATE_SENTINITIATE); 960 return true; 961 } 962 963 bool Session::Accept(const SessionDescription* sdesc) { 964 ASSERT(signaling_thread()->IsCurrent()); 965 966 // Only if just received initiate 967 if (state() != STATE_RECEIVEDINITIATE) 968 return false; 969 970 // Setup for signaling. 971 set_local_description(sdesc); 972 973 SessionError error; 974 if (!SendAcceptMessage(sdesc, &error)) { 975 LOG(LS_ERROR) << "Could not send accept message: " << error.text; 976 return false; 977 } 978 // TODO(juberti): Add BUNDLE support to transport-info messages. 979 PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL); 980 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 981 SetState(Session::STATE_SENTACCEPT); 982 return true; 983 } 984 985 bool Session::Reject(const std::string& reason) { 986 ASSERT(signaling_thread()->IsCurrent()); 987 988 // Reject is sent in response to an initiate or modify, to reject the 989 // request 990 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY) 991 return false; 992 993 SessionError error; 994 if (!SendRejectMessage(reason, &error)) { 995 LOG(LS_ERROR) << "Could not send reject message: " << error.text; 996 return false; 997 } 998 999 SetState(STATE_SENTREJECT); 1000 return true; 1001 } 1002 1003 bool Session::TerminateWithReason(const std::string& reason) { 1004 ASSERT(signaling_thread()->IsCurrent()); 1005 1006 // Either side can terminate, at any time. 1007 switch (state()) { 1008 case STATE_SENTTERMINATE: 1009 case STATE_RECEIVEDTERMINATE: 1010 return false; 1011 1012 case STATE_SENTREJECT: 1013 case STATE_RECEIVEDREJECT: 1014 // We don't need to send terminate if we sent or received a reject... 1015 // it's implicit. 1016 break; 1017 1018 default: 1019 SessionError error; 1020 if (!SendTerminateMessage(reason, &error)) { 1021 LOG(LS_ERROR) << "Could not send terminate message: " << error.text; 1022 return false; 1023 } 1024 break; 1025 } 1026 1027 SetState(STATE_SENTTERMINATE); 1028 return true; 1029 } 1030 1031 bool Session::SendInfoMessage(const XmlElements& elems, 1032 const std::string& remote_name) { 1033 ASSERT(signaling_thread()->IsCurrent()); 1034 SessionError error; 1035 if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) { 1036 LOG(LS_ERROR) << "Could not send info message " << error.text; 1037 return false; 1038 } 1039 return true; 1040 } 1041 1042 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) { 1043 XmlElements elems; 1044 WriteError write_error; 1045 if (!WriteDescriptionInfo(current_protocol_, 1046 contents, 1047 GetContentParsers(), 1048 &elems, &write_error)) { 1049 LOG(LS_ERROR) << "Could not write description info message: " 1050 << write_error.text; 1051 return false; 1052 } 1053 SessionError error; 1054 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) { 1055 LOG(LS_ERROR) << "Could not send description info message: " 1056 << error.text; 1057 return false; 1058 } 1059 return true; 1060 } 1061 1062 TransportInfos Session::GetEmptyTransportInfos( 1063 const ContentInfos& contents) const { 1064 TransportInfos tinfos; 1065 for (ContentInfos::const_iterator content = contents.begin(); 1066 content != contents.end(); ++content) { 1067 tinfos.push_back(TransportInfo(content->name, 1068 TransportDescription(transport_type(), 1069 std::string(), 1070 std::string()))); 1071 } 1072 return tinfos; 1073 } 1074 1075 bool Session::OnRemoteCandidates( 1076 const TransportInfos& tinfos, ParseError* error) { 1077 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1078 tinfo != tinfos.end(); ++tinfo) { 1079 std::string str_error; 1080 if (!BaseSession::OnRemoteCandidates( 1081 tinfo->content_name, tinfo->description.candidates, &str_error)) { 1082 return BadParse(str_error, error); 1083 } 1084 } 1085 return true; 1086 } 1087 1088 bool Session::CreateTransportProxies(const TransportInfos& tinfos, 1089 SessionError* error) { 1090 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1091 tinfo != tinfos.end(); ++tinfo) { 1092 if (tinfo->description.transport_type != transport_type()) { 1093 error->SetText("No supported transport in offer."); 1094 return false; 1095 } 1096 1097 GetOrCreateTransportProxy(tinfo->content_name); 1098 } 1099 return true; 1100 } 1101 1102 TransportParserMap Session::GetTransportParsers() { 1103 TransportParserMap parsers; 1104 parsers[transport_type()] = transport_parser_; 1105 return parsers; 1106 } 1107 1108 CandidateTranslatorMap Session::GetCandidateTranslators() { 1109 CandidateTranslatorMap translators; 1110 // NOTE: This technique makes it impossible to parse G-ICE 1111 // candidates in session-initiate messages because the channels 1112 // aren't yet created at that point. Since we don't use candidates 1113 // in session-initiate messages, we should be OK. Once we switch to 1114 // ICE, this translation shouldn't be necessary. 1115 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1116 iter != transport_proxies().end(); ++iter) { 1117 translators[iter->first] = iter->second; 1118 } 1119 return translators; 1120 } 1121 1122 ContentParserMap Session::GetContentParsers() { 1123 ContentParserMap parsers; 1124 parsers[content_type()] = client_; 1125 // We need to be able parse both RTP-based and SCTP-based Jingle 1126 // with the same client. 1127 if (content_type() == NS_JINGLE_RTP) { 1128 parsers[NS_JINGLE_DRAFT_SCTP] = client_; 1129 } 1130 return parsers; 1131 } 1132 1133 void Session::OnTransportRequestSignaling(Transport* transport) { 1134 ASSERT(signaling_thread()->IsCurrent()); 1135 TransportProxy* transproxy = GetTransportProxy(transport); 1136 ASSERT(transproxy != NULL); 1137 if (transproxy) { 1138 // Reset candidate allocation status for the transport proxy. 1139 transproxy->set_candidates_allocated(false); 1140 } 1141 SignalRequestSignaling(this); 1142 } 1143 1144 void Session::OnTransportConnecting(Transport* transport) { 1145 // This is an indication that we should begin watching the writability 1146 // state of the transport. 1147 OnTransportWritable(transport); 1148 } 1149 1150 void Session::OnTransportWritable(Transport* transport) { 1151 ASSERT(signaling_thread()->IsCurrent()); 1152 1153 // If the transport is not writable, start a timer to make sure that it 1154 // becomes writable within a reasonable amount of time. If it does not, we 1155 // terminate since we can't actually send data. If the transport is writable, 1156 // cancel the timer. Note that writability transitions may occur repeatedly 1157 // during the lifetime of the session. 1158 signaling_thread()->Clear(this, MSG_TIMEOUT); 1159 if (transport->HasChannels() && !transport->writable()) { 1160 signaling_thread()->PostDelayed( 1161 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); 1162 } 1163 } 1164 1165 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy, 1166 const Candidates& candidates) { 1167 ASSERT(signaling_thread()->IsCurrent()); 1168 if (transproxy != NULL) { 1169 if (initiator() && !initiate_acked_) { 1170 // TODO: This is to work around server re-ordering 1171 // messages. We send the candidates once the session-initiate 1172 // is acked. Once we have fixed the server to guarantee message 1173 // order, we can remove this case. 1174 transproxy->AddUnsentCandidates(candidates); 1175 } else { 1176 if (!transproxy->negotiated()) { 1177 transproxy->AddSentCandidates(candidates); 1178 } 1179 SessionError error; 1180 if (!SendTransportInfoMessage(transproxy, candidates, &error)) { 1181 LOG(LS_ERROR) << "Could not send transport info message: " 1182 << error.text; 1183 return; 1184 } 1185 } 1186 } 1187 } 1188 1189 void Session::OnTransportSendError(Transport* transport, 1190 const buzz::XmlElement* stanza, 1191 const buzz::QName& name, 1192 const std::string& type, 1193 const std::string& text, 1194 const buzz::XmlElement* extra_info) { 1195 ASSERT(signaling_thread()->IsCurrent()); 1196 SignalErrorMessage(this, stanza, name, type, text, extra_info); 1197 } 1198 1199 void Session::OnIncomingMessage(const SessionMessage& msg) { 1200 ASSERT(signaling_thread()->IsCurrent()); 1201 ASSERT(state() == STATE_INIT || msg.from == remote_name()); 1202 1203 if (current_protocol_== PROTOCOL_HYBRID) { 1204 if (msg.protocol == PROTOCOL_GINGLE) { 1205 current_protocol_ = PROTOCOL_GINGLE; 1206 } else { 1207 current_protocol_ = PROTOCOL_JINGLE; 1208 } 1209 } 1210 1211 bool valid = false; 1212 MessageError error; 1213 switch (msg.type) { 1214 case ACTION_SESSION_INITIATE: 1215 valid = OnInitiateMessage(msg, &error); 1216 break; 1217 case ACTION_SESSION_INFO: 1218 valid = OnInfoMessage(msg); 1219 break; 1220 case ACTION_SESSION_ACCEPT: 1221 valid = OnAcceptMessage(msg, &error); 1222 break; 1223 case ACTION_SESSION_REJECT: 1224 valid = OnRejectMessage(msg, &error); 1225 break; 1226 case ACTION_SESSION_TERMINATE: 1227 valid = OnTerminateMessage(msg, &error); 1228 break; 1229 case ACTION_TRANSPORT_INFO: 1230 valid = OnTransportInfoMessage(msg, &error); 1231 break; 1232 case ACTION_TRANSPORT_ACCEPT: 1233 valid = OnTransportAcceptMessage(msg, &error); 1234 break; 1235 case ACTION_DESCRIPTION_INFO: 1236 valid = OnDescriptionInfoMessage(msg, &error); 1237 break; 1238 default: 1239 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST, 1240 "unknown session message type", 1241 &error); 1242 } 1243 1244 if (valid) { 1245 SendAcknowledgementMessage(msg.stanza); 1246 } else { 1247 SignalErrorMessage(this, msg.stanza, error.type, 1248 "modify", error.text, NULL); 1249 } 1250 } 1251 1252 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza, 1253 const buzz::XmlElement* response_stanza, 1254 const SessionMessage& msg) { 1255 ASSERT(signaling_thread()->IsCurrent()); 1256 1257 if (msg.type == ACTION_SESSION_INITIATE) { 1258 OnInitiateAcked(); 1259 } 1260 } 1261 1262 void Session::OnInitiateAcked() { 1263 // TODO: This is to work around server re-ordering 1264 // messages. We send the candidates once the session-initiate 1265 // is acked. Once we have fixed the server to guarantee message 1266 // order, we can remove this case. 1267 if (!initiate_acked_) { 1268 initiate_acked_ = true; 1269 SessionError error; 1270 SendAllUnsentTransportInfoMessages(&error); 1271 } 1272 } 1273 1274 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza, 1275 const buzz::XmlElement* error_stanza) { 1276 ASSERT(signaling_thread()->IsCurrent()); 1277 1278 SessionMessage msg; 1279 ParseError parse_error; 1280 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) { 1281 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text 1282 << ":" << orig_stanza; 1283 return; 1284 } 1285 1286 // If the error is a session redirect, call OnRedirectError, which will 1287 // continue the session with a new remote JID. 1288 SessionRedirect redirect; 1289 if (FindSessionRedirect(error_stanza, &redirect)) { 1290 SessionError error; 1291 if (!OnRedirectError(redirect, &error)) { 1292 // TODO: Should we send a message back? The standard 1293 // says nothing about it. 1294 std::ostringstream desc; 1295 desc << "Failed to redirect: " << error.text; 1296 LOG(LS_ERROR) << desc.str(); 1297 SetError(ERROR_RESPONSE, desc.str()); 1298 } 1299 return; 1300 } 1301 1302 std::string error_type = "cancel"; 1303 1304 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR); 1305 if (error) { 1306 error_type = error->Attr(buzz::QN_TYPE); 1307 1308 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n" 1309 << "in response to:\n" << orig_stanza->Str(); 1310 } else { 1311 // don't crash if <error> is missing 1312 LOG(LS_ERROR) << "Session error without <error/> element, ignoring"; 1313 return; 1314 } 1315 1316 if (msg.type == ACTION_TRANSPORT_INFO) { 1317 // Transport messages frequently generate errors because they are sent right 1318 // when we detect a network failure. For that reason, we ignore such 1319 // errors, because if we do not establish writability again, we will 1320 // terminate anyway. The exceptions are transport-specific error tags, 1321 // which we pass on to the respective transport. 1322 } else if ((error_type != "continue") && (error_type != "wait")) { 1323 // We do not set an error if the other side said it is okay to continue 1324 // (possibly after waiting). These errors can be ignored. 1325 SetError(ERROR_RESPONSE, ""); 1326 } 1327 } 1328 1329 bool Session::OnInitiateMessage(const SessionMessage& msg, 1330 MessageError* error) { 1331 if (!CheckState(STATE_INIT, error)) 1332 return false; 1333 1334 SessionInitiate init; 1335 if (!ParseSessionInitiate(msg.protocol, msg.action_elem, 1336 GetContentParsers(), GetTransportParsers(), 1337 GetCandidateTranslators(), 1338 &init, error)) 1339 return false; 1340 1341 SessionError session_error; 1342 if (!CreateTransportProxies(init.transports, &session_error)) { 1343 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE, 1344 session_error.text, error); 1345 } 1346 1347 set_remote_name(msg.from); 1348 set_initiator_name(msg.initiator); 1349 set_remote_description(new SessionDescription(init.ClearContents(), 1350 init.transports, 1351 init.groups)); 1352 // Updating transport with TransportDescription. 1353 PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL); 1354 SetState(STATE_RECEIVEDINITIATE); 1355 1356 // Users of Session may listen to state change and call Reject(). 1357 if (state() != STATE_SENTREJECT) { 1358 if (!OnRemoteCandidates(init.transports, error)) 1359 return false; 1360 1361 // TODO(juberti): Auto-generate and push down the local transport answer. 1362 // This is necessary for trickling to work with RFC 5245 ICE. 1363 } 1364 return true; 1365 } 1366 1367 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) { 1368 if (!CheckState(STATE_SENTINITIATE, error)) 1369 return false; 1370 1371 SessionAccept accept; 1372 if (!ParseSessionAccept(msg.protocol, msg.action_elem, 1373 GetContentParsers(), GetTransportParsers(), 1374 GetCandidateTranslators(), 1375 &accept, error)) { 1376 return false; 1377 } 1378 1379 // If we get an accept, we can assume the initiate has been 1380 // received, even if we haven't gotten an IQ response. 1381 OnInitiateAcked(); 1382 1383 set_remote_description(new SessionDescription(accept.ClearContents(), 1384 accept.transports, 1385 accept.groups)); 1386 // Updating transport with TransportDescription. 1387 PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL); 1388 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 1389 SetState(STATE_RECEIVEDACCEPT); 1390 1391 if (!OnRemoteCandidates(accept.transports, error)) 1392 return false; 1393 1394 return true; 1395 } 1396 1397 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) { 1398 if (!CheckState(STATE_SENTINITIATE, error)) 1399 return false; 1400 1401 SetState(STATE_RECEIVEDREJECT); 1402 return true; 1403 } 1404 1405 bool Session::OnInfoMessage(const SessionMessage& msg) { 1406 SignalInfoMessage(this, msg.action_elem); 1407 return true; 1408 } 1409 1410 bool Session::OnTerminateMessage(const SessionMessage& msg, 1411 MessageError* error) { 1412 SessionTerminate term; 1413 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error)) 1414 return false; 1415 1416 SignalReceivedTerminateReason(this, term.reason); 1417 if (term.debug_reason != buzz::STR_EMPTY) { 1418 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason; 1419 } 1420 1421 SetState(STATE_RECEIVEDTERMINATE); 1422 return true; 1423 } 1424 1425 bool Session::OnTransportInfoMessage(const SessionMessage& msg, 1426 MessageError* error) { 1427 TransportInfos tinfos; 1428 if (!ParseTransportInfos(msg.protocol, msg.action_elem, 1429 initiator_description()->contents(), 1430 GetTransportParsers(), GetCandidateTranslators(), 1431 &tinfos, error)) 1432 return false; 1433 1434 if (!OnRemoteCandidates(tinfos, error)) 1435 return false; 1436 1437 return true; 1438 } 1439 1440 bool Session::OnTransportAcceptMessage(const SessionMessage& msg, 1441 MessageError* error) { 1442 // TODO: Currently here only for compatibility with 1443 // Gingle 1.1 clients (notably, Google Voice). 1444 return true; 1445 } 1446 1447 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg, 1448 MessageError* error) { 1449 if (!CheckState(STATE_INPROGRESS, error)) 1450 return false; 1451 1452 DescriptionInfo description_info; 1453 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem, 1454 GetContentParsers(), GetTransportParsers(), 1455 GetCandidateTranslators(), 1456 &description_info, error)) { 1457 return false; 1458 } 1459 1460 ContentInfos& updated_contents = description_info.contents; 1461 1462 // TODO: Currently, reflector sends back 1463 // video stream updates even for an audio-only call, which causes 1464 // this to fail. Put this back once reflector is fixed. 1465 // 1466 // ContentInfos::iterator it; 1467 // First, ensure all updates are valid before modifying remote_description_. 1468 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1469 // if (remote_description()->GetContentByName(it->name) == NULL) { 1470 // return false; 1471 // } 1472 // } 1473 1474 // TODO: We used to replace contents from an update, but 1475 // that no longer works with partial updates. We need to figure out 1476 // a way to merge patial updates into contents. For now, users of 1477 // Session should listen to SignalRemoteDescriptionUpdate and handle 1478 // updates. They should not expect remote_description to be the 1479 // latest value. 1480 // 1481 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1482 // remote_description()->RemoveContentByName(it->name); 1483 // remote_description()->AddContent(it->name, it->type, it->description); 1484 // } 1485 // } 1486 1487 SignalRemoteDescriptionUpdate(this, updated_contents); 1488 return true; 1489 } 1490 1491 bool BareJidsEqual(const std::string& name1, 1492 const std::string& name2) { 1493 buzz::Jid jid1(name1); 1494 buzz::Jid jid2(name2); 1495 1496 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2); 1497 } 1498 1499 bool Session::OnRedirectError(const SessionRedirect& redirect, 1500 SessionError* error) { 1501 MessageError message_error; 1502 if (!CheckState(STATE_SENTINITIATE, &message_error)) { 1503 return BadWrite(message_error.text, error); 1504 } 1505 1506 if (!BareJidsEqual(remote_name(), redirect.target)) 1507 return BadWrite("Redirection not allowed: must be the same bare jid.", 1508 error); 1509 1510 // When we receive a redirect, we point the session at the new JID 1511 // and resend the candidates. 1512 set_remote_name(redirect.target); 1513 return (SendInitiateMessage(local_description(), error) && 1514 ResendAllTransportInfoMessages(error)); 1515 } 1516 1517 bool Session::CheckState(State expected, MessageError* error) { 1518 if (state() != expected) { 1519 // The server can deliver messages out of order/repeated for various 1520 // reasons. For example, if the server does not recive our iq response, 1521 // it could assume that the iq it sent was lost, and will then send 1522 // it again. Ideally, we should implement reliable messaging with 1523 // duplicate elimination. 1524 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED, 1525 "message not allowed in current state", 1526 error); 1527 } 1528 return true; 1529 } 1530 1531 void Session::SetError(Error error, const std::string& error_desc) { 1532 BaseSession::SetError(error, error_desc); 1533 if (error != ERROR_NONE) 1534 signaling_thread()->Post(this, MSG_ERROR); 1535 } 1536 1537 void Session::OnMessage(talk_base::Message* pmsg) { 1538 // preserve this because BaseSession::OnMessage may modify it 1539 State orig_state = state(); 1540 1541 BaseSession::OnMessage(pmsg); 1542 1543 switch (pmsg->message_id) { 1544 case MSG_ERROR: 1545 TerminateWithReason(STR_TERMINATE_ERROR); 1546 break; 1547 1548 case MSG_STATE: 1549 switch (orig_state) { 1550 case STATE_SENTREJECT: 1551 case STATE_RECEIVEDREJECT: 1552 // Assume clean termination. 1553 Terminate(); 1554 break; 1555 1556 case STATE_SENTTERMINATE: 1557 case STATE_RECEIVEDTERMINATE: 1558 session_manager_->DestroySession(this); 1559 break; 1560 1561 default: 1562 // Explicitly ignoring some states here. 1563 break; 1564 } 1565 break; 1566 } 1567 } 1568 1569 bool Session::SendInitiateMessage(const SessionDescription* sdesc, 1570 SessionError* error) { 1571 SessionInitiate init; 1572 init.contents = sdesc->contents(); 1573 init.transports = GetEmptyTransportInfos(init.contents); 1574 init.groups = sdesc->groups(); 1575 return SendMessage(ACTION_SESSION_INITIATE, init, error); 1576 } 1577 1578 bool Session::WriteSessionAction( 1579 SignalingProtocol protocol, const SessionInitiate& init, 1580 XmlElements* elems, WriteError* error) { 1581 return WriteSessionInitiate(protocol, init.contents, init.transports, 1582 GetContentParsers(), GetTransportParsers(), 1583 GetCandidateTranslators(), init.groups, 1584 elems, error); 1585 } 1586 1587 bool Session::SendAcceptMessage(const SessionDescription* sdesc, 1588 SessionError* error) { 1589 XmlElements elems; 1590 if (!WriteSessionAccept(current_protocol_, 1591 sdesc->contents(), 1592 GetEmptyTransportInfos(sdesc->contents()), 1593 GetContentParsers(), GetTransportParsers(), 1594 GetCandidateTranslators(), sdesc->groups(), 1595 &elems, error)) { 1596 return false; 1597 } 1598 return SendMessage(ACTION_SESSION_ACCEPT, elems, error); 1599 } 1600 1601 bool Session::SendRejectMessage(const std::string& reason, 1602 SessionError* error) { 1603 SessionTerminate term(reason); 1604 return SendMessage(ACTION_SESSION_REJECT, term, error); 1605 } 1606 1607 bool Session::SendTerminateMessage(const std::string& reason, 1608 SessionError* error) { 1609 SessionTerminate term(reason); 1610 return SendMessage(ACTION_SESSION_TERMINATE, term, error); 1611 } 1612 1613 bool Session::WriteSessionAction(SignalingProtocol protocol, 1614 const SessionTerminate& term, 1615 XmlElements* elems, WriteError* error) { 1616 WriteSessionTerminate(protocol, term, elems); 1617 return true; 1618 } 1619 1620 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo, 1621 SessionError* error) { 1622 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error); 1623 } 1624 1625 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy, 1626 const Candidates& candidates, 1627 SessionError* error) { 1628 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(), 1629 TransportDescription(transproxy->type(), std::vector<std::string>(), 1630 std::string(), std::string(), ICEMODE_FULL, 1631 CONNECTIONROLE_NONE, NULL, candidates)), error); 1632 } 1633 1634 bool Session::WriteSessionAction(SignalingProtocol protocol, 1635 const TransportInfo& tinfo, 1636 XmlElements* elems, WriteError* error) { 1637 TransportInfos tinfos; 1638 tinfos.push_back(tinfo); 1639 return WriteTransportInfos(protocol, tinfos, 1640 GetTransportParsers(), GetCandidateTranslators(), 1641 elems, error); 1642 } 1643 1644 bool Session::ResendAllTransportInfoMessages(SessionError* error) { 1645 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1646 iter != transport_proxies().end(); ++iter) { 1647 TransportProxy* transproxy = iter->second; 1648 if (transproxy->sent_candidates().size() > 0) { 1649 if (!SendTransportInfoMessage( 1650 transproxy, transproxy->sent_candidates(), error)) { 1651 LOG(LS_ERROR) << "Could not resend transport info messages: " 1652 << error->text; 1653 return false; 1654 } 1655 transproxy->ClearSentCandidates(); 1656 } 1657 } 1658 return true; 1659 } 1660 1661 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) { 1662 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1663 iter != transport_proxies().end(); ++iter) { 1664 TransportProxy* transproxy = iter->second; 1665 if (transproxy->unsent_candidates().size() > 0) { 1666 if (!SendTransportInfoMessage( 1667 transproxy, transproxy->unsent_candidates(), error)) { 1668 LOG(LS_ERROR) << "Could not send unsent transport info messages: " 1669 << error->text; 1670 return false; 1671 } 1672 transproxy->ClearUnsentCandidates(); 1673 } 1674 } 1675 return true; 1676 } 1677 1678 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1679 SessionError* error) { 1680 return SendMessage(type, action_elems, remote_name(), error); 1681 } 1682 1683 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1684 const std::string& remote_name, SessionError* error) { 1685 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1686 new buzz::XmlElement(buzz::QN_IQ)); 1687 1688 SessionMessage msg(current_protocol_, type, id(), initiator_name()); 1689 msg.to = remote_name; 1690 WriteSessionMessage(msg, action_elems, stanza.get()); 1691 1692 SignalOutgoingMessage(this, stanza.get()); 1693 return true; 1694 } 1695 1696 template <typename Action> 1697 bool Session::SendMessage(ActionType type, const Action& action, 1698 SessionError* error) { 1699 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1700 new buzz::XmlElement(buzz::QN_IQ)); 1701 if (!WriteActionMessage(type, action, stanza.get(), error)) 1702 return false; 1703 1704 SignalOutgoingMessage(this, stanza.get()); 1705 return true; 1706 } 1707 1708 template <typename Action> 1709 bool Session::WriteActionMessage(ActionType type, const Action& action, 1710 buzz::XmlElement* stanza, 1711 WriteError* error) { 1712 if (current_protocol_ == PROTOCOL_HYBRID) { 1713 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error)) 1714 return false; 1715 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error)) 1716 return false; 1717 } else { 1718 if (!WriteActionMessage(current_protocol_, type, action, stanza, error)) 1719 return false; 1720 } 1721 return true; 1722 } 1723 1724 template <typename Action> 1725 bool Session::WriteActionMessage(SignalingProtocol protocol, 1726 ActionType type, const Action& action, 1727 buzz::XmlElement* stanza, WriteError* error) { 1728 XmlElements action_elems; 1729 if (!WriteSessionAction(protocol, action, &action_elems, error)) 1730 return false; 1731 1732 SessionMessage msg(protocol, type, id(), initiator_name()); 1733 msg.to = remote_name(); 1734 1735 WriteSessionMessage(msg, action_elems, stanza); 1736 return true; 1737 } 1738 1739 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) { 1740 talk_base::scoped_ptr<buzz::XmlElement> ack( 1741 new buzz::XmlElement(buzz::QN_IQ)); 1742 ack->SetAttr(buzz::QN_TO, remote_name()); 1743 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); 1744 ack->SetAttr(buzz::QN_TYPE, "result"); 1745 1746 SignalOutgoingMessage(this, ack.get()); 1747 } 1748 1749 } // namespace cricket 1750