1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/session.h" 29 30 #include "talk/p2p/base/dtlstransport.h" 31 #include "talk/p2p/base/p2ptransport.h" 32 #include "talk/p2p/base/sessionclient.h" 33 #include "talk/p2p/base/transport.h" 34 #include "talk/p2p/base/transportchannelproxy.h" 35 #include "talk/p2p/base/transportinfo.h" 36 #include "talk/xmpp/constants.h" 37 #include "talk/xmpp/jid.h" 38 #include "webrtc/base/bind.h" 39 #include "webrtc/base/common.h" 40 #include "webrtc/base/helpers.h" 41 #include "webrtc/base/logging.h" 42 #include "webrtc/base/scoped_ptr.h" 43 #include "webrtc/base/sslstreamadapter.h" 44 45 #include "talk/p2p/base/constants.h" 46 47 namespace cricket { 48 49 using rtc::Bind; 50 51 bool BadMessage(const buzz::QName type, 52 const std::string& text, 53 MessageError* err) { 54 err->SetType(type); 55 err->SetText(text); 56 return false; 57 } 58 59 TransportProxy::~TransportProxy() { 60 for (ChannelMap::iterator iter = channels_.begin(); 61 iter != channels_.end(); ++iter) { 62 iter->second->SignalDestroyed(iter->second); 63 delete iter->second; 64 } 65 } 66 67 const std::string& TransportProxy::type() const { 68 return transport_->get()->type(); 69 } 70 71 TransportChannel* TransportProxy::GetChannel(int component) { 72 ASSERT(rtc::Thread::Current() == worker_thread_); 73 return GetChannelProxy(component); 74 } 75 76 TransportChannel* TransportProxy::CreateChannel( 77 const std::string& name, int component) { 78 ASSERT(rtc::Thread::Current() == worker_thread_); 79 ASSERT(GetChannel(component) == NULL); 80 ASSERT(!transport_->get()->HasChannel(component)); 81 82 // We always create a proxy in case we need to change out the transport later. 83 TransportChannelProxy* channel = 84 new TransportChannelProxy(content_name(), name, component); 85 channels_[component] = channel; 86 87 // If we're already negotiated, create an impl and hook it up to the proxy 88 // channel. If we're connecting, create an impl but don't hook it up yet. 89 if (negotiated_) { 90 SetupChannelProxy_w(component, channel); 91 } else if (connecting_) { 92 GetOrCreateChannelProxyImpl_w(component); 93 } 94 return channel; 95 } 96 97 bool TransportProxy::HasChannel(int component) { 98 return transport_->get()->HasChannel(component); 99 } 100 101 void TransportProxy::DestroyChannel(int component) { 102 ASSERT(rtc::Thread::Current() == worker_thread_); 103 TransportChannel* channel = GetChannel(component); 104 if (channel) { 105 // If the state of TransportProxy is not NEGOTIATED 106 // then TransportChannelProxy and its impl are not 107 // connected. Both must be connected before 108 // deletion. 109 if (!negotiated_) { 110 SetupChannelProxy_w(component, GetChannelProxy(component)); 111 } 112 113 channels_.erase(component); 114 channel->SignalDestroyed(channel); 115 delete channel; 116 } 117 } 118 119 void TransportProxy::ConnectChannels() { 120 if (!connecting_) { 121 if (!negotiated_) { 122 for (ChannelMap::iterator iter = channels_.begin(); 123 iter != channels_.end(); ++iter) { 124 GetOrCreateChannelProxyImpl(iter->first); 125 } 126 } 127 connecting_ = true; 128 } 129 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we 130 // don't have any channels yet, so we need to allow this method to be called 131 // multiple times. Once we fix Transport, we can move this call inside the 132 // if (!connecting_) block. 133 transport_->get()->ConnectChannels(); 134 } 135 136 void TransportProxy::CompleteNegotiation() { 137 if (!negotiated_) { 138 for (ChannelMap::iterator iter = channels_.begin(); 139 iter != channels_.end(); ++iter) { 140 SetupChannelProxy(iter->first, iter->second); 141 } 142 negotiated_ = true; 143 } 144 } 145 146 void TransportProxy::AddSentCandidates(const Candidates& candidates) { 147 for (Candidates::const_iterator cand = candidates.begin(); 148 cand != candidates.end(); ++cand) { 149 sent_candidates_.push_back(*cand); 150 } 151 } 152 153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) { 154 for (Candidates::const_iterator cand = candidates.begin(); 155 cand != candidates.end(); ++cand) { 156 unsent_candidates_.push_back(*cand); 157 } 158 } 159 160 bool TransportProxy::GetChannelNameFromComponent( 161 int component, std::string* channel_name) const { 162 const TransportChannelProxy* channel = GetChannelProxy(component); 163 if (channel == NULL) { 164 return false; 165 } 166 167 *channel_name = channel->name(); 168 return true; 169 } 170 171 bool TransportProxy::GetComponentFromChannelName( 172 const std::string& channel_name, int* component) const { 173 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name); 174 if (channel == NULL) { 175 return false; 176 } 177 178 *component = channel->component(); 179 return true; 180 } 181 182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const { 183 ChannelMap::const_iterator iter = channels_.find(component); 184 return (iter != channels_.end()) ? iter->second : NULL; 185 } 186 187 TransportChannelProxy* TransportProxy::GetChannelProxyByName( 188 const std::string& name) const { 189 for (ChannelMap::const_iterator iter = channels_.begin(); 190 iter != channels_.end(); 191 ++iter) { 192 if (iter->second->name() == name) { 193 return iter->second; 194 } 195 } 196 return NULL; 197 } 198 199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl( 200 int component) { 201 return worker_thread_->Invoke<TransportChannelImpl*>(Bind( 202 &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component)); 203 } 204 205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w( 206 int component) { 207 ASSERT(rtc::Thread::Current() == worker_thread_); 208 TransportChannelImpl* impl = transport_->get()->GetChannel(component); 209 if (impl == NULL) { 210 impl = transport_->get()->CreateChannel(component); 211 } 212 return impl; 213 } 214 215 void TransportProxy::SetupChannelProxy( 216 int component, TransportChannelProxy* transproxy) { 217 worker_thread_->Invoke<void>(Bind( 218 &TransportProxy::SetupChannelProxy_w, this, component, transproxy)); 219 } 220 221 void TransportProxy::SetupChannelProxy_w( 222 int component, TransportChannelProxy* transproxy) { 223 ASSERT(rtc::Thread::Current() == worker_thread_); 224 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component); 225 ASSERT(impl != NULL); 226 transproxy->SetImplementation(impl); 227 } 228 229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy, 230 TransportChannelImpl* impl) { 231 worker_thread_->Invoke<void>(Bind( 232 &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl)); 233 } 234 235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy, 236 TransportChannelImpl* impl) { 237 ASSERT(rtc::Thread::Current() == worker_thread_); 238 ASSERT(proxy != NULL); 239 proxy->SetImplementation(impl); 240 } 241 242 // This function muxes |this| onto |target| by repointing |this| at 243 // |target|'s transport and setting our TransportChannelProxies 244 // to point to |target|'s underlying implementations. 245 bool TransportProxy::SetupMux(TransportProxy* target) { 246 // Bail out if there's nothing to do. 247 if (transport_ == target->transport_) { 248 return true; 249 } 250 251 // Run through all channels and remove any non-rtp transport channels before 252 // setting target transport channels. 253 for (ChannelMap::const_iterator iter = channels_.begin(); 254 iter != channels_.end(); ++iter) { 255 if (!target->transport_->get()->HasChannel(iter->first)) { 256 // Remove if channel doesn't exist in |transport_|. 257 ReplaceChannelProxyImpl(iter->second, NULL); 258 } else { 259 // Replace the impl for all the TransportProxyChannels with the channels 260 // from |target|'s transport. Fail if there's not an exact match. 261 ReplaceChannelProxyImpl( 262 iter->second, target->transport_->get()->CreateChannel(iter->first)); 263 } 264 } 265 266 // Now replace our transport. Must happen afterwards because 267 // it deletes all impls as a side effect. 268 transport_ = target->transport_; 269 transport_->get()->SignalCandidatesReady.connect( 270 this, &TransportProxy::OnTransportCandidatesReady); 271 set_candidates_allocated(target->candidates_allocated()); 272 return true; 273 } 274 275 void TransportProxy::SetIceRole(IceRole role) { 276 transport_->get()->SetIceRole(role); 277 } 278 279 bool TransportProxy::SetLocalTransportDescription( 280 const TransportDescription& description, 281 ContentAction action, 282 std::string* error_desc) { 283 // If this is an answer, finalize the negotiation. 284 if (action == CA_ANSWER) { 285 CompleteNegotiation(); 286 } 287 bool result = transport_->get()->SetLocalTransportDescription(description, 288 action, 289 error_desc); 290 if (result) 291 local_description_set_ = true; 292 return result; 293 } 294 295 bool TransportProxy::SetRemoteTransportDescription( 296 const TransportDescription& description, 297 ContentAction action, 298 std::string* error_desc) { 299 // If this is an answer, finalize the negotiation. 300 if (action == CA_ANSWER) { 301 CompleteNegotiation(); 302 } 303 bool result = transport_->get()->SetRemoteTransportDescription(description, 304 action, 305 error_desc); 306 if (result) 307 remote_description_set_ = true; 308 return result; 309 } 310 311 void TransportProxy::OnSignalingReady() { 312 // If we're starting a new allocation sequence, reset our state. 313 set_candidates_allocated(false); 314 transport_->get()->OnSignalingReady(); 315 } 316 317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates, 318 std::string* error) { 319 // Ensure the transport is negotiated before handling candidates. 320 // TODO(juberti): Remove this once everybody calls SetLocalTD. 321 CompleteNegotiation(); 322 323 // Verify each candidate before passing down to transport layer. 324 for (Candidates::const_iterator cand = candidates.begin(); 325 cand != candidates.end(); ++cand) { 326 if (!transport_->get()->VerifyCandidate(*cand, error)) 327 return false; 328 if (!HasChannel(cand->component())) { 329 *error = "Candidate has unknown component: " + cand->ToString() + 330 " for content: " + content_name_; 331 return false; 332 } 333 } 334 transport_->get()->OnRemoteCandidates(candidates); 335 return true; 336 } 337 338 void TransportProxy::SetIdentity( 339 rtc::SSLIdentity* identity) { 340 transport_->get()->SetIdentity(identity); 341 } 342 343 std::string BaseSession::StateToString(State state) { 344 switch (state) { 345 case Session::STATE_INIT: 346 return "STATE_INIT"; 347 case Session::STATE_SENTINITIATE: 348 return "STATE_SENTINITIATE"; 349 case Session::STATE_RECEIVEDINITIATE: 350 return "STATE_RECEIVEDINITIATE"; 351 case Session::STATE_SENTPRACCEPT: 352 return "STATE_SENTPRACCEPT"; 353 case Session::STATE_SENTACCEPT: 354 return "STATE_SENTACCEPT"; 355 case Session::STATE_RECEIVEDPRACCEPT: 356 return "STATE_RECEIVEDPRACCEPT"; 357 case Session::STATE_RECEIVEDACCEPT: 358 return "STATE_RECEIVEDACCEPT"; 359 case Session::STATE_SENTMODIFY: 360 return "STATE_SENTMODIFY"; 361 case Session::STATE_RECEIVEDMODIFY: 362 return "STATE_RECEIVEDMODIFY"; 363 case Session::STATE_SENTREJECT: 364 return "STATE_SENTREJECT"; 365 case Session::STATE_RECEIVEDREJECT: 366 return "STATE_RECEIVEDREJECT"; 367 case Session::STATE_SENTREDIRECT: 368 return "STATE_SENTREDIRECT"; 369 case Session::STATE_SENTTERMINATE: 370 return "STATE_SENTTERMINATE"; 371 case Session::STATE_RECEIVEDTERMINATE: 372 return "STATE_RECEIVEDTERMINATE"; 373 case Session::STATE_INPROGRESS: 374 return "STATE_INPROGRESS"; 375 case Session::STATE_DEINIT: 376 return "STATE_DEINIT"; 377 default: 378 break; 379 } 380 return "STATE_" + rtc::ToString(state); 381 } 382 383 BaseSession::BaseSession(rtc::Thread* signaling_thread, 384 rtc::Thread* worker_thread, 385 PortAllocator* port_allocator, 386 const std::string& sid, 387 const std::string& content_type, 388 bool initiator) 389 : state_(STATE_INIT), 390 error_(ERROR_NONE), 391 signaling_thread_(signaling_thread), 392 worker_thread_(worker_thread), 393 port_allocator_(port_allocator), 394 sid_(sid), 395 content_type_(content_type), 396 transport_type_(NS_GINGLE_P2P), 397 initiator_(initiator), 398 identity_(NULL), 399 ice_tiebreaker_(rtc::CreateRandomId64()), 400 role_switch_(false) { 401 ASSERT(signaling_thread->IsCurrent()); 402 } 403 404 BaseSession::~BaseSession() { 405 ASSERT(signaling_thread()->IsCurrent()); 406 407 ASSERT(state_ != STATE_DEINIT); 408 LogState(state_, STATE_DEINIT); 409 state_ = STATE_DEINIT; 410 SignalState(this, state_); 411 412 for (TransportMap::iterator iter = transports_.begin(); 413 iter != transports_.end(); ++iter) { 414 delete iter->second; 415 } 416 } 417 418 const SessionDescription* BaseSession::local_description() const { 419 // TODO(tommi): Assert on thread correctness. 420 return local_description_.get(); 421 } 422 423 const SessionDescription* BaseSession::remote_description() const { 424 // TODO(tommi): Assert on thread correctness. 425 return remote_description_.get(); 426 } 427 428 SessionDescription* BaseSession::remote_description() { 429 // TODO(tommi): Assert on thread correctness. 430 return remote_description_.get(); 431 } 432 433 void BaseSession::set_local_description(const SessionDescription* sdesc) { 434 // TODO(tommi): Assert on thread correctness. 435 if (sdesc != local_description_.get()) 436 local_description_.reset(sdesc); 437 } 438 439 void BaseSession::set_remote_description(SessionDescription* sdesc) { 440 // TODO(tommi): Assert on thread correctness. 441 if (sdesc != remote_description_) 442 remote_description_.reset(sdesc); 443 } 444 445 const SessionDescription* BaseSession::initiator_description() const { 446 // TODO(tommi): Assert on thread correctness. 447 return initiator_ ? local_description_.get() : remote_description_.get(); 448 } 449 450 bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) { 451 if (identity_) 452 return false; 453 identity_ = identity; 454 for (TransportMap::iterator iter = transports_.begin(); 455 iter != transports_.end(); ++iter) { 456 iter->second->SetIdentity(identity_); 457 } 458 return true; 459 } 460 461 bool BaseSession::PushdownTransportDescription(ContentSource source, 462 ContentAction action, 463 std::string* error_desc) { 464 if (source == CS_LOCAL) { 465 return PushdownLocalTransportDescription(local_description(), 466 action, 467 error_desc); 468 } 469 return PushdownRemoteTransportDescription(remote_description(), 470 action, 471 error_desc); 472 } 473 474 bool BaseSession::PushdownLocalTransportDescription( 475 const SessionDescription* sdesc, 476 ContentAction action, 477 std::string* error_desc) { 478 // Update the Transports with the right information, and trigger them to 479 // start connecting. 480 for (TransportMap::iterator iter = transports_.begin(); 481 iter != transports_.end(); ++iter) { 482 // If no transport info was in this session description, ret == false 483 // and we just skip this one. 484 TransportDescription tdesc; 485 bool ret = GetTransportDescription( 486 sdesc, iter->second->content_name(), &tdesc); 487 if (ret) { 488 if (!iter->second->SetLocalTransportDescription(tdesc, action, 489 error_desc)) { 490 return false; 491 } 492 493 iter->second->ConnectChannels(); 494 } 495 } 496 497 return true; 498 } 499 500 bool BaseSession::PushdownRemoteTransportDescription( 501 const SessionDescription* sdesc, 502 ContentAction action, 503 std::string* error_desc) { 504 // Update the Transports with the right information. 505 for (TransportMap::iterator iter = transports_.begin(); 506 iter != transports_.end(); ++iter) { 507 TransportDescription tdesc; 508 509 // If no transport info was in this session description, ret == false 510 // and we just skip this one. 511 bool ret = GetTransportDescription( 512 sdesc, iter->second->content_name(), &tdesc); 513 if (ret) { 514 if (!iter->second->SetRemoteTransportDescription(tdesc, action, 515 error_desc)) { 516 return false; 517 } 518 } 519 } 520 521 return true; 522 } 523 524 TransportChannel* BaseSession::CreateChannel(const std::string& content_name, 525 const std::string& channel_name, 526 int component) { 527 // We create the proxy "on demand" here because we need to support 528 // creating channels at any time, even before we send or receive 529 // initiate messages, which is before we create the transports. 530 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name); 531 return transproxy->CreateChannel(channel_name, component); 532 } 533 534 TransportChannel* BaseSession::GetChannel(const std::string& content_name, 535 int component) { 536 TransportProxy* transproxy = GetTransportProxy(content_name); 537 if (transproxy == NULL) 538 return NULL; 539 540 return transproxy->GetChannel(component); 541 } 542 543 void BaseSession::DestroyChannel(const std::string& content_name, 544 int component) { 545 TransportProxy* transproxy = GetTransportProxy(content_name); 546 ASSERT(transproxy != NULL); 547 transproxy->DestroyChannel(component); 548 } 549 550 TransportProxy* BaseSession::GetOrCreateTransportProxy( 551 const std::string& content_name) { 552 TransportProxy* transproxy = GetTransportProxy(content_name); 553 if (transproxy) 554 return transproxy; 555 556 Transport* transport = CreateTransport(content_name); 557 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED); 558 transport->SetIceTiebreaker(ice_tiebreaker_); 559 // TODO: Connect all the Transport signals to TransportProxy 560 // then to the BaseSession. 561 transport->SignalConnecting.connect( 562 this, &BaseSession::OnTransportConnecting); 563 transport->SignalWritableState.connect( 564 this, &BaseSession::OnTransportWritable); 565 transport->SignalRequestSignaling.connect( 566 this, &BaseSession::OnTransportRequestSignaling); 567 transport->SignalTransportError.connect( 568 this, &BaseSession::OnTransportSendError); 569 transport->SignalRouteChange.connect( 570 this, &BaseSession::OnTransportRouteChange); 571 transport->SignalCandidatesAllocationDone.connect( 572 this, &BaseSession::OnTransportCandidatesAllocationDone); 573 transport->SignalRoleConflict.connect( 574 this, &BaseSession::OnRoleConflict); 575 transport->SignalCompleted.connect( 576 this, &BaseSession::OnTransportCompleted); 577 transport->SignalFailed.connect( 578 this, &BaseSession::OnTransportFailed); 579 580 transproxy = new TransportProxy(worker_thread_, sid_, content_name, 581 new TransportWrapper(transport)); 582 transproxy->SignalCandidatesReady.connect( 583 this, &BaseSession::OnTransportProxyCandidatesReady); 584 if (identity_) 585 transproxy->SetIdentity(identity_); 586 transports_[content_name] = transproxy; 587 588 return transproxy; 589 } 590 591 Transport* BaseSession::GetTransport(const std::string& content_name) { 592 TransportProxy* transproxy = GetTransportProxy(content_name); 593 if (transproxy == NULL) 594 return NULL; 595 return transproxy->impl(); 596 } 597 598 TransportProxy* BaseSession::GetTransportProxy( 599 const std::string& content_name) { 600 TransportMap::iterator iter = transports_.find(content_name); 601 return (iter != transports_.end()) ? iter->second : NULL; 602 } 603 604 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) { 605 for (TransportMap::iterator iter = transports_.begin(); 606 iter != transports_.end(); ++iter) { 607 TransportProxy* transproxy = iter->second; 608 if (transproxy->impl() == transport) { 609 return transproxy; 610 } 611 } 612 return NULL; 613 } 614 615 TransportProxy* BaseSession::GetFirstTransportProxy() { 616 if (transports_.empty()) 617 return NULL; 618 return transports_.begin()->second; 619 } 620 621 void BaseSession::DestroyTransportProxy( 622 const std::string& content_name) { 623 TransportMap::iterator iter = transports_.find(content_name); 624 if (iter != transports_.end()) { 625 delete iter->second; 626 transports_.erase(content_name); 627 } 628 } 629 630 cricket::Transport* BaseSession::CreateTransport( 631 const std::string& content_name) { 632 ASSERT(transport_type_ == NS_GINGLE_P2P); 633 return new cricket::DtlsTransport<P2PTransport>( 634 signaling_thread(), worker_thread(), content_name, 635 port_allocator(), identity_); 636 } 637 638 bool BaseSession::GetStats(SessionStats* stats) { 639 for (TransportMap::iterator iter = transports_.begin(); 640 iter != transports_.end(); ++iter) { 641 std::string proxy_id = iter->second->content_name(); 642 // We are ignoring not-yet-instantiated transports. 643 if (iter->second->impl()) { 644 std::string transport_id = iter->second->impl()->content_name(); 645 stats->proxy_to_transport[proxy_id] = transport_id; 646 if (stats->transport_stats.find(transport_id) 647 == stats->transport_stats.end()) { 648 TransportStats subinfos; 649 if (!iter->second->impl()->GetStats(&subinfos)) { 650 return false; 651 } 652 stats->transport_stats[transport_id] = subinfos; 653 } 654 } 655 } 656 return true; 657 } 658 659 void BaseSession::SetState(State state) { 660 ASSERT(signaling_thread_->IsCurrent()); 661 if (state != state_) { 662 LogState(state_, state); 663 state_ = state; 664 SignalState(this, state_); 665 signaling_thread_->Post(this, MSG_STATE); 666 } 667 SignalNewDescription(); 668 } 669 670 void BaseSession::SetError(Error error, const std::string& error_desc) { 671 ASSERT(signaling_thread_->IsCurrent()); 672 if (error != error_) { 673 error_ = error; 674 error_desc_ = error_desc; 675 SignalError(this, error); 676 } 677 } 678 679 void BaseSession::OnSignalingReady() { 680 ASSERT(signaling_thread()->IsCurrent()); 681 for (TransportMap::iterator iter = transports_.begin(); 682 iter != transports_.end(); ++iter) { 683 iter->second->OnSignalingReady(); 684 } 685 } 686 687 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to 688 // start, remove this method once everyone calls PushdownLocalTD. 689 void BaseSession::SpeculativelyConnectAllTransportChannels() { 690 // Put all transports into the connecting state. 691 for (TransportMap::iterator iter = transports_.begin(); 692 iter != transports_.end(); ++iter) { 693 iter->second->ConnectChannels(); 694 } 695 } 696 697 bool BaseSession::OnRemoteCandidates(const std::string& content_name, 698 const Candidates& candidates, 699 std::string* error) { 700 // Give candidates to the appropriate transport, and tell that transport 701 // to start connecting, if it's not already doing so. 702 TransportProxy* transproxy = GetTransportProxy(content_name); 703 if (!transproxy) { 704 *error = "Unknown content name " + content_name; 705 return false; 706 } 707 if (!transproxy->OnRemoteCandidates(candidates, error)) { 708 return false; 709 } 710 // TODO(juberti): Remove this call once we can be sure that we always have 711 // a local transport description (which will trigger the connection). 712 transproxy->ConnectChannels(); 713 return true; 714 } 715 716 bool BaseSession::MaybeEnableMuxingSupport() { 717 // We need both a local and remote description to decide if we should mux. 718 if ((state_ == STATE_SENTINITIATE || 719 state_ == STATE_RECEIVEDINITIATE) && 720 ((local_description_ == NULL) || 721 (remote_description_ == NULL))) { 722 return false; 723 } 724 725 // In order to perform the multiplexing, we need all proxies to be in the 726 // negotiated state, i.e. to have implementations underneath. 727 // Ensure that this is the case, regardless of whether we are going to mux. 728 for (TransportMap::iterator iter = transports_.begin(); 729 iter != transports_.end(); ++iter) { 730 ASSERT(iter->second->negotiated()); 731 if (!iter->second->negotiated()) 732 return false; 733 } 734 735 // If both sides agree to BUNDLE, mux all the specified contents onto the 736 // transport belonging to the first content name in the BUNDLE group. 737 // If the contents are already muxed, this will be a no-op. 738 // TODO(juberti): Should this check that local and remote have configured 739 // BUNDLE the same way? 740 bool candidates_allocated = IsCandidateAllocationDone(); 741 const ContentGroup* local_bundle_group = 742 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 743 const ContentGroup* remote_bundle_group = 744 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 745 if (local_bundle_group && remote_bundle_group && 746 local_bundle_group->FirstContentName()) { 747 const std::string* content_name = local_bundle_group->FirstContentName(); 748 const ContentInfo* content = 749 local_description_->GetContentByName(*content_name); 750 ASSERT(content != NULL); 751 if (!SetSelectedProxy(content->name, local_bundle_group)) { 752 LOG(LS_WARNING) << "Failed to set up BUNDLE"; 753 return false; 754 } 755 756 // If we weren't done gathering before, we might be done now, as a result 757 // of enabling mux. 758 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: " 759 << *content_name; 760 if (!candidates_allocated) { 761 MaybeCandidateAllocationDone(); 762 } 763 } else { 764 LOG(LS_INFO) << "No BUNDLE information, not bundling."; 765 } 766 return true; 767 } 768 769 bool BaseSession::SetSelectedProxy(const std::string& content_name, 770 const ContentGroup* muxed_group) { 771 TransportProxy* selected_proxy = GetTransportProxy(content_name); 772 if (!selected_proxy) { 773 return false; 774 } 775 776 ASSERT(selected_proxy->negotiated()); 777 for (TransportMap::iterator iter = transports_.begin(); 778 iter != transports_.end(); ++iter) { 779 // If content is part of the mux group, then repoint its proxy at the 780 // transport object that we have chosen to mux onto. If the proxy 781 // is already pointing at the right object, it will be a no-op. 782 if (muxed_group->HasContentName(iter->first) && 783 !iter->second->SetupMux(selected_proxy)) { 784 return false; 785 } 786 } 787 return true; 788 } 789 790 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) { 791 // TODO(juberti): This is a clunky way of processing the done signal. Instead, 792 // TransportProxy should receive the done signal directly, set its allocated 793 // flag internally, and then reissue the done signal to Session. 794 // Overall we should make TransportProxy receive *all* the signals from 795 // Transport, since this removes the need to manually iterate over all 796 // the transports, as is needed to make sure signals are handled properly 797 // when BUNDLEing. 798 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways 799 // that make it prohibitively difficult to run dbg builds. Disabled for now. 800 //ASSERT(!IsCandidateAllocationDone()); 801 for (TransportMap::iterator iter = transports_.begin(); 802 iter != transports_.end(); ++iter) { 803 if (iter->second->impl() == transport) { 804 iter->second->set_candidates_allocated(true); 805 } 806 } 807 MaybeCandidateAllocationDone(); 808 } 809 810 bool BaseSession::IsCandidateAllocationDone() const { 811 for (TransportMap::const_iterator iter = transports_.begin(); 812 iter != transports_.end(); ++iter) { 813 if (!iter->second->candidates_allocated()) 814 return false; 815 } 816 return true; 817 } 818 819 void BaseSession::MaybeCandidateAllocationDone() { 820 if (IsCandidateAllocationDone()) { 821 LOG(LS_INFO) << "Candidate gathering is complete."; 822 OnCandidatesAllocationDone(); 823 } 824 } 825 826 void BaseSession::OnRoleConflict() { 827 if (role_switch_) { 828 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; 829 return; 830 } 831 832 role_switch_ = true; 833 for (TransportMap::iterator iter = transports_.begin(); 834 iter != transports_.end(); ++iter) { 835 // Role will be reverse of initial role setting. 836 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING; 837 iter->second->SetIceRole(role); 838 } 839 } 840 841 void BaseSession::LogState(State old_state, State new_state) { 842 LOG(LS_INFO) << "Session:" << id() 843 << " Old state:" << StateToString(old_state) 844 << " New state:" << StateToString(new_state) 845 << " Type:" << content_type() 846 << " Transport:" << transport_type(); 847 } 848 849 // static 850 bool BaseSession::GetTransportDescription(const SessionDescription* description, 851 const std::string& content_name, 852 TransportDescription* tdesc) { 853 if (!description || !tdesc) { 854 return false; 855 } 856 const TransportInfo* transport_info = 857 description->GetTransportInfoByName(content_name); 858 if (!transport_info) { 859 return false; 860 } 861 *tdesc = transport_info->description; 862 return true; 863 } 864 865 void BaseSession::SignalNewDescription() { 866 ContentAction action; 867 ContentSource source; 868 if (!GetContentAction(&action, &source)) { 869 return; 870 } 871 if (source == CS_LOCAL) { 872 SignalNewLocalDescription(this, action); 873 } else { 874 SignalNewRemoteDescription(this, action); 875 } 876 } 877 878 bool BaseSession::GetContentAction(ContentAction* action, 879 ContentSource* source) { 880 switch (state_) { 881 // new local description 882 case STATE_SENTINITIATE: 883 *action = CA_OFFER; 884 *source = CS_LOCAL; 885 break; 886 case STATE_SENTPRACCEPT: 887 *action = CA_PRANSWER; 888 *source = CS_LOCAL; 889 break; 890 case STATE_SENTACCEPT: 891 *action = CA_ANSWER; 892 *source = CS_LOCAL; 893 break; 894 // new remote description 895 case STATE_RECEIVEDINITIATE: 896 *action = CA_OFFER; 897 *source = CS_REMOTE; 898 break; 899 case STATE_RECEIVEDPRACCEPT: 900 *action = CA_PRANSWER; 901 *source = CS_REMOTE; 902 break; 903 case STATE_RECEIVEDACCEPT: 904 *action = CA_ANSWER; 905 *source = CS_REMOTE; 906 break; 907 default: 908 return false; 909 } 910 return true; 911 } 912 913 void BaseSession::OnMessage(rtc::Message *pmsg) { 914 switch (pmsg->message_id) { 915 case MSG_TIMEOUT: 916 // Session timeout has occured. 917 SetError(ERROR_TIME, "Session timeout has occured."); 918 break; 919 920 case MSG_STATE: 921 switch (state_) { 922 case STATE_SENTACCEPT: 923 case STATE_RECEIVEDACCEPT: 924 SetState(STATE_INPROGRESS); 925 break; 926 927 default: 928 // Explicitly ignoring some states here. 929 break; 930 } 931 break; 932 } 933 } 934 935 Session::Session(SessionManager* session_manager, 936 const std::string& local_name, 937 const std::string& initiator_name, 938 const std::string& sid, 939 const std::string& content_type, 940 SessionClient* client) 941 : BaseSession(session_manager->signaling_thread(), 942 session_manager->worker_thread(), 943 session_manager->port_allocator(), 944 sid, content_type, initiator_name == local_name) { 945 ASSERT(client != NULL); 946 session_manager_ = session_manager; 947 local_name_ = local_name; 948 initiator_name_ = initiator_name; 949 transport_parser_ = new P2PTransportParser(); 950 client_ = client; 951 initiate_acked_ = false; 952 current_protocol_ = PROTOCOL_HYBRID; 953 } 954 955 Session::~Session() { 956 delete transport_parser_; 957 } 958 959 bool Session::Initiate(const std::string& to, 960 const SessionDescription* sdesc) { 961 ASSERT(signaling_thread()->IsCurrent()); 962 SessionError error; 963 964 // Only from STATE_INIT 965 if (state() != STATE_INIT) 966 return false; 967 968 // Setup for signaling. 969 set_remote_name(to); 970 set_local_description(sdesc); 971 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()), 972 &error)) { 973 LOG(LS_ERROR) << "Could not create transports: " << error.text; 974 return false; 975 } 976 977 if (!SendInitiateMessage(sdesc, &error)) { 978 LOG(LS_ERROR) << "Could not send initiate message: " << error.text; 979 return false; 980 } 981 982 // We need to connect transport proxy and impl here so that we can process 983 // the TransportDescriptions. 984 SpeculativelyConnectAllTransportChannels(); 985 986 PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL); 987 SetState(Session::STATE_SENTINITIATE); 988 return true; 989 } 990 991 bool Session::Accept(const SessionDescription* sdesc) { 992 ASSERT(signaling_thread()->IsCurrent()); 993 994 // Only if just received initiate 995 if (state() != STATE_RECEIVEDINITIATE) 996 return false; 997 998 // Setup for signaling. 999 set_local_description(sdesc); 1000 1001 SessionError error; 1002 if (!SendAcceptMessage(sdesc, &error)) { 1003 LOG(LS_ERROR) << "Could not send accept message: " << error.text; 1004 return false; 1005 } 1006 // TODO(juberti): Add BUNDLE support to transport-info messages. 1007 PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL); 1008 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 1009 SetState(Session::STATE_SENTACCEPT); 1010 return true; 1011 } 1012 1013 bool Session::Reject(const std::string& reason) { 1014 ASSERT(signaling_thread()->IsCurrent()); 1015 1016 // Reject is sent in response to an initiate or modify, to reject the 1017 // request 1018 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY) 1019 return false; 1020 1021 SessionError error; 1022 if (!SendRejectMessage(reason, &error)) { 1023 LOG(LS_ERROR) << "Could not send reject message: " << error.text; 1024 return false; 1025 } 1026 1027 SetState(STATE_SENTREJECT); 1028 return true; 1029 } 1030 1031 bool Session::TerminateWithReason(const std::string& reason) { 1032 ASSERT(signaling_thread()->IsCurrent()); 1033 1034 // Either side can terminate, at any time. 1035 switch (state()) { 1036 case STATE_SENTTERMINATE: 1037 case STATE_RECEIVEDTERMINATE: 1038 return false; 1039 1040 case STATE_SENTREJECT: 1041 case STATE_RECEIVEDREJECT: 1042 // We don't need to send terminate if we sent or received a reject... 1043 // it's implicit. 1044 break; 1045 1046 default: 1047 SessionError error; 1048 if (!SendTerminateMessage(reason, &error)) { 1049 LOG(LS_ERROR) << "Could not send terminate message: " << error.text; 1050 return false; 1051 } 1052 break; 1053 } 1054 1055 SetState(STATE_SENTTERMINATE); 1056 return true; 1057 } 1058 1059 bool Session::SendInfoMessage(const XmlElements& elems, 1060 const std::string& remote_name) { 1061 ASSERT(signaling_thread()->IsCurrent()); 1062 SessionError error; 1063 if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) { 1064 LOG(LS_ERROR) << "Could not send info message " << error.text; 1065 return false; 1066 } 1067 return true; 1068 } 1069 1070 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) { 1071 XmlElements elems; 1072 WriteError write_error; 1073 if (!WriteDescriptionInfo(current_protocol_, 1074 contents, 1075 GetContentParsers(), 1076 &elems, &write_error)) { 1077 LOG(LS_ERROR) << "Could not write description info message: " 1078 << write_error.text; 1079 return false; 1080 } 1081 SessionError error; 1082 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) { 1083 LOG(LS_ERROR) << "Could not send description info message: " 1084 << error.text; 1085 return false; 1086 } 1087 return true; 1088 } 1089 1090 TransportInfos Session::GetEmptyTransportInfos( 1091 const ContentInfos& contents) const { 1092 TransportInfos tinfos; 1093 for (ContentInfos::const_iterator content = contents.begin(); 1094 content != contents.end(); ++content) { 1095 tinfos.push_back(TransportInfo(content->name, 1096 TransportDescription(transport_type(), 1097 std::string(), 1098 std::string()))); 1099 } 1100 return tinfos; 1101 } 1102 1103 bool Session::OnRemoteCandidates( 1104 const TransportInfos& tinfos, ParseError* error) { 1105 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1106 tinfo != tinfos.end(); ++tinfo) { 1107 std::string str_error; 1108 if (!BaseSession::OnRemoteCandidates( 1109 tinfo->content_name, tinfo->description.candidates, &str_error)) { 1110 return BadParse(str_error, error); 1111 } 1112 } 1113 return true; 1114 } 1115 1116 bool Session::CreateTransportProxies(const TransportInfos& tinfos, 1117 SessionError* error) { 1118 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1119 tinfo != tinfos.end(); ++tinfo) { 1120 if (tinfo->description.transport_type != transport_type()) { 1121 error->SetText("No supported transport in offer."); 1122 return false; 1123 } 1124 1125 GetOrCreateTransportProxy(tinfo->content_name); 1126 } 1127 return true; 1128 } 1129 1130 TransportParserMap Session::GetTransportParsers() { 1131 TransportParserMap parsers; 1132 parsers[transport_type()] = transport_parser_; 1133 return parsers; 1134 } 1135 1136 CandidateTranslatorMap Session::GetCandidateTranslators() { 1137 CandidateTranslatorMap translators; 1138 // NOTE: This technique makes it impossible to parse G-ICE 1139 // candidates in session-initiate messages because the channels 1140 // aren't yet created at that point. Since we don't use candidates 1141 // in session-initiate messages, we should be OK. Once we switch to 1142 // ICE, this translation shouldn't be necessary. 1143 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1144 iter != transport_proxies().end(); ++iter) { 1145 translators[iter->first] = iter->second; 1146 } 1147 return translators; 1148 } 1149 1150 ContentParserMap Session::GetContentParsers() { 1151 ContentParserMap parsers; 1152 parsers[content_type()] = client_; 1153 // We need to be able parse both RTP-based and SCTP-based Jingle 1154 // with the same client. 1155 if (content_type() == NS_JINGLE_RTP) { 1156 parsers[NS_JINGLE_DRAFT_SCTP] = client_; 1157 } 1158 return parsers; 1159 } 1160 1161 void Session::OnTransportRequestSignaling(Transport* transport) { 1162 ASSERT(signaling_thread()->IsCurrent()); 1163 TransportProxy* transproxy = GetTransportProxy(transport); 1164 ASSERT(transproxy != NULL); 1165 if (transproxy) { 1166 // Reset candidate allocation status for the transport proxy. 1167 transproxy->set_candidates_allocated(false); 1168 } 1169 SignalRequestSignaling(this); 1170 } 1171 1172 void Session::OnTransportConnecting(Transport* transport) { 1173 // This is an indication that we should begin watching the writability 1174 // state of the transport. 1175 OnTransportWritable(transport); 1176 } 1177 1178 void Session::OnTransportWritable(Transport* transport) { 1179 ASSERT(signaling_thread()->IsCurrent()); 1180 1181 // If the transport is not writable, start a timer to make sure that it 1182 // becomes writable within a reasonable amount of time. If it does not, we 1183 // terminate since we can't actually send data. If the transport is writable, 1184 // cancel the timer. Note that writability transitions may occur repeatedly 1185 // during the lifetime of the session. 1186 signaling_thread()->Clear(this, MSG_TIMEOUT); 1187 if (transport->HasChannels() && !transport->writable()) { 1188 signaling_thread()->PostDelayed( 1189 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); 1190 } 1191 } 1192 1193 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy, 1194 const Candidates& candidates) { 1195 ASSERT(signaling_thread()->IsCurrent()); 1196 if (transproxy != NULL) { 1197 if (initiator() && !initiate_acked_) { 1198 // TODO: This is to work around server re-ordering 1199 // messages. We send the candidates once the session-initiate 1200 // is acked. Once we have fixed the server to guarantee message 1201 // order, we can remove this case. 1202 transproxy->AddUnsentCandidates(candidates); 1203 } else { 1204 if (!transproxy->negotiated()) { 1205 transproxy->AddSentCandidates(candidates); 1206 } 1207 SessionError error; 1208 if (!SendTransportInfoMessage(transproxy, candidates, &error)) { 1209 LOG(LS_ERROR) << "Could not send transport info message: " 1210 << error.text; 1211 return; 1212 } 1213 } 1214 } 1215 } 1216 1217 void Session::OnTransportSendError(Transport* transport, 1218 const buzz::XmlElement* stanza, 1219 const buzz::QName& name, 1220 const std::string& type, 1221 const std::string& text, 1222 const buzz::XmlElement* extra_info) { 1223 ASSERT(signaling_thread()->IsCurrent()); 1224 SignalErrorMessage(this, stanza, name, type, text, extra_info); 1225 } 1226 1227 void Session::OnIncomingMessage(const SessionMessage& msg) { 1228 ASSERT(signaling_thread()->IsCurrent()); 1229 ASSERT(state() == STATE_INIT || msg.from == remote_name()); 1230 1231 if (current_protocol_== PROTOCOL_HYBRID) { 1232 if (msg.protocol == PROTOCOL_GINGLE) { 1233 current_protocol_ = PROTOCOL_GINGLE; 1234 } else { 1235 current_protocol_ = PROTOCOL_JINGLE; 1236 } 1237 } 1238 1239 bool valid = false; 1240 MessageError error; 1241 switch (msg.type) { 1242 case ACTION_SESSION_INITIATE: 1243 valid = OnInitiateMessage(msg, &error); 1244 break; 1245 case ACTION_SESSION_INFO: 1246 valid = OnInfoMessage(msg); 1247 break; 1248 case ACTION_SESSION_ACCEPT: 1249 valid = OnAcceptMessage(msg, &error); 1250 break; 1251 case ACTION_SESSION_REJECT: 1252 valid = OnRejectMessage(msg, &error); 1253 break; 1254 case ACTION_SESSION_TERMINATE: 1255 valid = OnTerminateMessage(msg, &error); 1256 break; 1257 case ACTION_TRANSPORT_INFO: 1258 valid = OnTransportInfoMessage(msg, &error); 1259 break; 1260 case ACTION_TRANSPORT_ACCEPT: 1261 valid = OnTransportAcceptMessage(msg, &error); 1262 break; 1263 case ACTION_DESCRIPTION_INFO: 1264 valid = OnDescriptionInfoMessage(msg, &error); 1265 break; 1266 default: 1267 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST, 1268 "unknown session message type", 1269 &error); 1270 } 1271 1272 if (valid) { 1273 SendAcknowledgementMessage(msg.stanza); 1274 } else { 1275 SignalErrorMessage(this, msg.stanza, error.type, 1276 "modify", error.text, NULL); 1277 } 1278 } 1279 1280 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza, 1281 const buzz::XmlElement* response_stanza, 1282 const SessionMessage& msg) { 1283 ASSERT(signaling_thread()->IsCurrent()); 1284 1285 if (msg.type == ACTION_SESSION_INITIATE) { 1286 OnInitiateAcked(); 1287 } 1288 } 1289 1290 void Session::OnInitiateAcked() { 1291 // TODO: This is to work around server re-ordering 1292 // messages. We send the candidates once the session-initiate 1293 // is acked. Once we have fixed the server to guarantee message 1294 // order, we can remove this case. 1295 if (!initiate_acked_) { 1296 initiate_acked_ = true; 1297 SessionError error; 1298 SendAllUnsentTransportInfoMessages(&error); 1299 } 1300 } 1301 1302 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza, 1303 const buzz::XmlElement* error_stanza) { 1304 ASSERT(signaling_thread()->IsCurrent()); 1305 1306 SessionMessage msg; 1307 ParseError parse_error; 1308 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) { 1309 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text 1310 << ":" << orig_stanza; 1311 return; 1312 } 1313 1314 // If the error is a session redirect, call OnRedirectError, which will 1315 // continue the session with a new remote JID. 1316 SessionRedirect redirect; 1317 if (FindSessionRedirect(error_stanza, &redirect)) { 1318 SessionError error; 1319 if (!OnRedirectError(redirect, &error)) { 1320 // TODO: Should we send a message back? The standard 1321 // says nothing about it. 1322 std::ostringstream desc; 1323 desc << "Failed to redirect: " << error.text; 1324 LOG(LS_ERROR) << desc.str(); 1325 SetError(ERROR_RESPONSE, desc.str()); 1326 } 1327 return; 1328 } 1329 1330 std::string error_type = "cancel"; 1331 1332 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR); 1333 if (error) { 1334 error_type = error->Attr(buzz::QN_TYPE); 1335 1336 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n" 1337 << "in response to:\n" << orig_stanza->Str(); 1338 } else { 1339 // don't crash if <error> is missing 1340 LOG(LS_ERROR) << "Session error without <error/> element, ignoring"; 1341 return; 1342 } 1343 1344 if (msg.type == ACTION_TRANSPORT_INFO) { 1345 // Transport messages frequently generate errors because they are sent right 1346 // when we detect a network failure. For that reason, we ignore such 1347 // errors, because if we do not establish writability again, we will 1348 // terminate anyway. The exceptions are transport-specific error tags, 1349 // which we pass on to the respective transport. 1350 } else if ((error_type != "continue") && (error_type != "wait")) { 1351 // We do not set an error if the other side said it is okay to continue 1352 // (possibly after waiting). These errors can be ignored. 1353 SetError(ERROR_RESPONSE, ""); 1354 } 1355 } 1356 1357 bool Session::OnInitiateMessage(const SessionMessage& msg, 1358 MessageError* error) { 1359 if (!CheckState(STATE_INIT, error)) 1360 return false; 1361 1362 SessionInitiate init; 1363 if (!ParseSessionInitiate(msg.protocol, msg.action_elem, 1364 GetContentParsers(), GetTransportParsers(), 1365 GetCandidateTranslators(), 1366 &init, error)) 1367 return false; 1368 1369 SessionError session_error; 1370 if (!CreateTransportProxies(init.transports, &session_error)) { 1371 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE, 1372 session_error.text, error); 1373 } 1374 1375 set_remote_name(msg.from); 1376 set_initiator_name(msg.initiator); 1377 set_remote_description(new SessionDescription(init.ClearContents(), 1378 init.transports, 1379 init.groups)); 1380 // Updating transport with TransportDescription. 1381 PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL); 1382 SetState(STATE_RECEIVEDINITIATE); 1383 1384 // Users of Session may listen to state change and call Reject(). 1385 if (state() != STATE_SENTREJECT) { 1386 if (!OnRemoteCandidates(init.transports, error)) 1387 return false; 1388 1389 // TODO(juberti): Auto-generate and push down the local transport answer. 1390 // This is necessary for trickling to work with RFC 5245 ICE. 1391 } 1392 return true; 1393 } 1394 1395 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) { 1396 if (!CheckState(STATE_SENTINITIATE, error)) 1397 return false; 1398 1399 SessionAccept accept; 1400 if (!ParseSessionAccept(msg.protocol, msg.action_elem, 1401 GetContentParsers(), GetTransportParsers(), 1402 GetCandidateTranslators(), 1403 &accept, error)) { 1404 return false; 1405 } 1406 1407 // If we get an accept, we can assume the initiate has been 1408 // received, even if we haven't gotten an IQ response. 1409 OnInitiateAcked(); 1410 1411 set_remote_description(new SessionDescription(accept.ClearContents(), 1412 accept.transports, 1413 accept.groups)); 1414 // Updating transport with TransportDescription. 1415 PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL); 1416 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 1417 SetState(STATE_RECEIVEDACCEPT); 1418 1419 if (!OnRemoteCandidates(accept.transports, error)) 1420 return false; 1421 1422 return true; 1423 } 1424 1425 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) { 1426 if (!CheckState(STATE_SENTINITIATE, error)) 1427 return false; 1428 1429 SetState(STATE_RECEIVEDREJECT); 1430 return true; 1431 } 1432 1433 bool Session::OnInfoMessage(const SessionMessage& msg) { 1434 SignalInfoMessage(this, msg.action_elem); 1435 return true; 1436 } 1437 1438 bool Session::OnTerminateMessage(const SessionMessage& msg, 1439 MessageError* error) { 1440 SessionTerminate term; 1441 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error)) 1442 return false; 1443 1444 SignalReceivedTerminateReason(this, term.reason); 1445 if (term.debug_reason != buzz::STR_EMPTY) { 1446 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason; 1447 } 1448 1449 SetState(STATE_RECEIVEDTERMINATE); 1450 return true; 1451 } 1452 1453 bool Session::OnTransportInfoMessage(const SessionMessage& msg, 1454 MessageError* error) { 1455 TransportInfos tinfos; 1456 if (!ParseTransportInfos(msg.protocol, msg.action_elem, 1457 initiator_description()->contents(), 1458 GetTransportParsers(), GetCandidateTranslators(), 1459 &tinfos, error)) 1460 return false; 1461 1462 if (!OnRemoteCandidates(tinfos, error)) 1463 return false; 1464 1465 return true; 1466 } 1467 1468 bool Session::OnTransportAcceptMessage(const SessionMessage& msg, 1469 MessageError* error) { 1470 // TODO: Currently here only for compatibility with 1471 // Gingle 1.1 clients (notably, Google Voice). 1472 return true; 1473 } 1474 1475 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg, 1476 MessageError* error) { 1477 if (!CheckState(STATE_INPROGRESS, error)) 1478 return false; 1479 1480 DescriptionInfo description_info; 1481 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem, 1482 GetContentParsers(), GetTransportParsers(), 1483 GetCandidateTranslators(), 1484 &description_info, error)) { 1485 return false; 1486 } 1487 1488 ContentInfos& updated_contents = description_info.contents; 1489 1490 // TODO: Currently, reflector sends back 1491 // video stream updates even for an audio-only call, which causes 1492 // this to fail. Put this back once reflector is fixed. 1493 // 1494 // ContentInfos::iterator it; 1495 // First, ensure all updates are valid before modifying remote_description_. 1496 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1497 // if (remote_description()->GetContentByName(it->name) == NULL) { 1498 // return false; 1499 // } 1500 // } 1501 1502 // TODO: We used to replace contents from an update, but 1503 // that no longer works with partial updates. We need to figure out 1504 // a way to merge patial updates into contents. For now, users of 1505 // Session should listen to SignalRemoteDescriptionUpdate and handle 1506 // updates. They should not expect remote_description to be the 1507 // latest value. 1508 // 1509 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1510 // remote_description()->RemoveContentByName(it->name); 1511 // remote_description()->AddContent(it->name, it->type, it->description); 1512 // } 1513 // } 1514 1515 SignalRemoteDescriptionUpdate(this, updated_contents); 1516 return true; 1517 } 1518 1519 bool BareJidsEqual(const std::string& name1, 1520 const std::string& name2) { 1521 buzz::Jid jid1(name1); 1522 buzz::Jid jid2(name2); 1523 1524 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2); 1525 } 1526 1527 bool Session::OnRedirectError(const SessionRedirect& redirect, 1528 SessionError* error) { 1529 MessageError message_error; 1530 if (!CheckState(STATE_SENTINITIATE, &message_error)) { 1531 return BadWrite(message_error.text, error); 1532 } 1533 1534 if (!BareJidsEqual(remote_name(), redirect.target)) 1535 return BadWrite("Redirection not allowed: must be the same bare jid.", 1536 error); 1537 1538 // When we receive a redirect, we point the session at the new JID 1539 // and resend the candidates. 1540 set_remote_name(redirect.target); 1541 return (SendInitiateMessage(local_description(), error) && 1542 ResendAllTransportInfoMessages(error)); 1543 } 1544 1545 bool Session::CheckState(State expected, MessageError* error) { 1546 if (state() != expected) { 1547 // The server can deliver messages out of order/repeated for various 1548 // reasons. For example, if the server does not recive our iq response, 1549 // it could assume that the iq it sent was lost, and will then send 1550 // it again. Ideally, we should implement reliable messaging with 1551 // duplicate elimination. 1552 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED, 1553 "message not allowed in current state", 1554 error); 1555 } 1556 return true; 1557 } 1558 1559 void Session::SetError(Error error, const std::string& error_desc) { 1560 BaseSession::SetError(error, error_desc); 1561 if (error != ERROR_NONE) 1562 signaling_thread()->Post(this, MSG_ERROR); 1563 } 1564 1565 void Session::OnMessage(rtc::Message* pmsg) { 1566 // preserve this because BaseSession::OnMessage may modify it 1567 State orig_state = state(); 1568 1569 BaseSession::OnMessage(pmsg); 1570 1571 switch (pmsg->message_id) { 1572 case MSG_ERROR: 1573 TerminateWithReason(STR_TERMINATE_ERROR); 1574 break; 1575 1576 case MSG_STATE: 1577 switch (orig_state) { 1578 case STATE_SENTREJECT: 1579 case STATE_RECEIVEDREJECT: 1580 // Assume clean termination. 1581 Terminate(); 1582 break; 1583 1584 case STATE_SENTTERMINATE: 1585 case STATE_RECEIVEDTERMINATE: 1586 session_manager_->DestroySession(this); 1587 break; 1588 1589 default: 1590 // Explicitly ignoring some states here. 1591 break; 1592 } 1593 break; 1594 } 1595 } 1596 1597 bool Session::SendInitiateMessage(const SessionDescription* sdesc, 1598 SessionError* error) { 1599 SessionInitiate init; 1600 init.contents = sdesc->contents(); 1601 init.transports = GetEmptyTransportInfos(init.contents); 1602 init.groups = sdesc->groups(); 1603 return SendMessage(ACTION_SESSION_INITIATE, init, error); 1604 } 1605 1606 bool Session::WriteSessionAction( 1607 SignalingProtocol protocol, const SessionInitiate& init, 1608 XmlElements* elems, WriteError* error) { 1609 return WriteSessionInitiate(protocol, init.contents, init.transports, 1610 GetContentParsers(), GetTransportParsers(), 1611 GetCandidateTranslators(), init.groups, 1612 elems, error); 1613 } 1614 1615 bool Session::SendAcceptMessage(const SessionDescription* sdesc, 1616 SessionError* error) { 1617 XmlElements elems; 1618 if (!WriteSessionAccept(current_protocol_, 1619 sdesc->contents(), 1620 GetEmptyTransportInfos(sdesc->contents()), 1621 GetContentParsers(), GetTransportParsers(), 1622 GetCandidateTranslators(), sdesc->groups(), 1623 &elems, error)) { 1624 return false; 1625 } 1626 return SendMessage(ACTION_SESSION_ACCEPT, elems, error); 1627 } 1628 1629 bool Session::SendRejectMessage(const std::string& reason, 1630 SessionError* error) { 1631 SessionTerminate term(reason); 1632 return SendMessage(ACTION_SESSION_REJECT, term, error); 1633 } 1634 1635 bool Session::SendTerminateMessage(const std::string& reason, 1636 SessionError* error) { 1637 SessionTerminate term(reason); 1638 return SendMessage(ACTION_SESSION_TERMINATE, term, error); 1639 } 1640 1641 bool Session::WriteSessionAction(SignalingProtocol protocol, 1642 const SessionTerminate& term, 1643 XmlElements* elems, WriteError* error) { 1644 WriteSessionTerminate(protocol, term, elems); 1645 return true; 1646 } 1647 1648 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo, 1649 SessionError* error) { 1650 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error); 1651 } 1652 1653 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy, 1654 const Candidates& candidates, 1655 SessionError* error) { 1656 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(), 1657 TransportDescription(transproxy->type(), std::vector<std::string>(), 1658 std::string(), std::string(), ICEMODE_FULL, 1659 CONNECTIONROLE_NONE, NULL, candidates)), error); 1660 } 1661 1662 bool Session::WriteSessionAction(SignalingProtocol protocol, 1663 const TransportInfo& tinfo, 1664 XmlElements* elems, WriteError* error) { 1665 TransportInfos tinfos; 1666 tinfos.push_back(tinfo); 1667 return WriteTransportInfos(protocol, tinfos, 1668 GetTransportParsers(), GetCandidateTranslators(), 1669 elems, error); 1670 } 1671 1672 bool Session::ResendAllTransportInfoMessages(SessionError* error) { 1673 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1674 iter != transport_proxies().end(); ++iter) { 1675 TransportProxy* transproxy = iter->second; 1676 if (transproxy->sent_candidates().size() > 0) { 1677 if (!SendTransportInfoMessage( 1678 transproxy, transproxy->sent_candidates(), error)) { 1679 LOG(LS_ERROR) << "Could not resend transport info messages: " 1680 << error->text; 1681 return false; 1682 } 1683 transproxy->ClearSentCandidates(); 1684 } 1685 } 1686 return true; 1687 } 1688 1689 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) { 1690 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1691 iter != transport_proxies().end(); ++iter) { 1692 TransportProxy* transproxy = iter->second; 1693 if (transproxy->unsent_candidates().size() > 0) { 1694 if (!SendTransportInfoMessage( 1695 transproxy, transproxy->unsent_candidates(), error)) { 1696 LOG(LS_ERROR) << "Could not send unsent transport info messages: " 1697 << error->text; 1698 return false; 1699 } 1700 transproxy->ClearUnsentCandidates(); 1701 } 1702 } 1703 return true; 1704 } 1705 1706 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1707 SessionError* error) { 1708 return SendMessage(type, action_elems, remote_name(), error); 1709 } 1710 1711 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1712 const std::string& remote_name, SessionError* error) { 1713 rtc::scoped_ptr<buzz::XmlElement> stanza( 1714 new buzz::XmlElement(buzz::QN_IQ)); 1715 1716 SessionMessage msg(current_protocol_, type, id(), initiator_name()); 1717 msg.to = remote_name; 1718 WriteSessionMessage(msg, action_elems, stanza.get()); 1719 1720 SignalOutgoingMessage(this, stanza.get()); 1721 return true; 1722 } 1723 1724 template <typename Action> 1725 bool Session::SendMessage(ActionType type, const Action& action, 1726 SessionError* error) { 1727 rtc::scoped_ptr<buzz::XmlElement> stanza( 1728 new buzz::XmlElement(buzz::QN_IQ)); 1729 if (!WriteActionMessage(type, action, stanza.get(), error)) 1730 return false; 1731 1732 SignalOutgoingMessage(this, stanza.get()); 1733 return true; 1734 } 1735 1736 template <typename Action> 1737 bool Session::WriteActionMessage(ActionType type, const Action& action, 1738 buzz::XmlElement* stanza, 1739 WriteError* error) { 1740 if (current_protocol_ == PROTOCOL_HYBRID) { 1741 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error)) 1742 return false; 1743 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error)) 1744 return false; 1745 } else { 1746 if (!WriteActionMessage(current_protocol_, type, action, stanza, error)) 1747 return false; 1748 } 1749 return true; 1750 } 1751 1752 template <typename Action> 1753 bool Session::WriteActionMessage(SignalingProtocol protocol, 1754 ActionType type, const Action& action, 1755 buzz::XmlElement* stanza, WriteError* error) { 1756 XmlElements action_elems; 1757 if (!WriteSessionAction(protocol, action, &action_elems, error)) 1758 return false; 1759 1760 SessionMessage msg(protocol, type, id(), initiator_name()); 1761 msg.to = remote_name(); 1762 1763 WriteSessionMessage(msg, action_elems, stanza); 1764 return true; 1765 } 1766 1767 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) { 1768 rtc::scoped_ptr<buzz::XmlElement> ack( 1769 new buzz::XmlElement(buzz::QN_IQ)); 1770 ack->SetAttr(buzz::QN_TO, remote_name()); 1771 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); 1772 ack->SetAttr(buzz::QN_TYPE, "result"); 1773 1774 SignalOutgoingMessage(this, ack.get()); 1775 } 1776 1777 } // namespace cricket 1778