1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/session.h" 29 30 #include "talk/base/bind.h" 31 #include "talk/base/common.h" 32 #include "talk/base/logging.h" 33 #include "talk/base/helpers.h" 34 #include "talk/base/scoped_ptr.h" 35 #include "talk/base/sslstreamadapter.h" 36 #include "talk/xmpp/constants.h" 37 #include "talk/xmpp/jid.h" 38 #include "talk/p2p/base/dtlstransport.h" 39 #include "talk/p2p/base/p2ptransport.h" 40 #include "talk/p2p/base/sessionclient.h" 41 #include "talk/p2p/base/transport.h" 42 #include "talk/p2p/base/transportchannelproxy.h" 43 #include "talk/p2p/base/transportinfo.h" 44 45 #include "talk/p2p/base/constants.h" 46 47 namespace cricket { 48 49 using talk_base::Bind; 50 51 bool BadMessage(const buzz::QName type, 52 const std::string& text, 53 MessageError* err) { 54 err->SetType(type); 55 err->SetText(text); 56 return false; 57 } 58 59 TransportProxy::~TransportProxy() { 60 for (ChannelMap::iterator iter = channels_.begin(); 61 iter != channels_.end(); ++iter) { 62 iter->second->SignalDestroyed(iter->second); 63 delete iter->second; 64 } 65 } 66 67 std::string TransportProxy::type() const { 68 return transport_->get()->type(); 69 } 70 71 TransportChannel* TransportProxy::GetChannel(int component) { 72 ASSERT(talk_base::Thread::Current() == worker_thread_); 73 return GetChannelProxy(component); 74 } 75 76 TransportChannel* TransportProxy::CreateChannel( 77 const std::string& name, int component) { 78 ASSERT(talk_base::Thread::Current() == worker_thread_); 79 ASSERT(GetChannel(component) == NULL); 80 ASSERT(!transport_->get()->HasChannel(component)); 81 82 // We always create a proxy in case we need to change out the transport later. 83 TransportChannelProxy* channel = 84 new TransportChannelProxy(content_name(), name, component); 85 channels_[component] = channel; 86 87 // If we're already negotiated, create an impl and hook it up to the proxy 88 // channel. If we're connecting, create an impl but don't hook it up yet. 89 if (negotiated_) { 90 SetupChannelProxy_w(component, channel); 91 } else if (connecting_) { 92 GetOrCreateChannelProxyImpl_w(component); 93 } 94 return channel; 95 } 96 97 bool TransportProxy::HasChannel(int component) { 98 return transport_->get()->HasChannel(component); 99 } 100 101 void TransportProxy::DestroyChannel(int component) { 102 ASSERT(talk_base::Thread::Current() == worker_thread_); 103 TransportChannel* channel = GetChannel(component); 104 if (channel) { 105 // If the state of TransportProxy is not NEGOTIATED 106 // then TransportChannelProxy and its impl are not 107 // connected. Both must be connected before 108 // deletion. 109 if (!negotiated_) { 110 SetupChannelProxy_w(component, GetChannelProxy(component)); 111 } 112 113 channels_.erase(component); 114 channel->SignalDestroyed(channel); 115 delete channel; 116 } 117 } 118 119 void TransportProxy::ConnectChannels() { 120 if (!connecting_) { 121 if (!negotiated_) { 122 for (ChannelMap::iterator iter = channels_.begin(); 123 iter != channels_.end(); ++iter) { 124 GetOrCreateChannelProxyImpl(iter->first); 125 } 126 } 127 connecting_ = true; 128 } 129 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we 130 // don't have any channels yet, so we need to allow this method to be called 131 // multiple times. Once we fix Transport, we can move this call inside the 132 // if (!connecting_) block. 133 transport_->get()->ConnectChannels(); 134 } 135 136 void TransportProxy::CompleteNegotiation() { 137 if (!negotiated_) { 138 for (ChannelMap::iterator iter = channels_.begin(); 139 iter != channels_.end(); ++iter) { 140 SetupChannelProxy(iter->first, iter->second); 141 } 142 negotiated_ = true; 143 } 144 } 145 146 void TransportProxy::AddSentCandidates(const Candidates& candidates) { 147 for (Candidates::const_iterator cand = candidates.begin(); 148 cand != candidates.end(); ++cand) { 149 sent_candidates_.push_back(*cand); 150 } 151 } 152 153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) { 154 for (Candidates::const_iterator cand = candidates.begin(); 155 cand != candidates.end(); ++cand) { 156 unsent_candidates_.push_back(*cand); 157 } 158 } 159 160 bool TransportProxy::GetChannelNameFromComponent( 161 int component, std::string* channel_name) const { 162 const TransportChannelProxy* channel = GetChannelProxy(component); 163 if (channel == NULL) { 164 return false; 165 } 166 167 *channel_name = channel->name(); 168 return true; 169 } 170 171 bool TransportProxy::GetComponentFromChannelName( 172 const std::string& channel_name, int* component) const { 173 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name); 174 if (channel == NULL) { 175 return false; 176 } 177 178 *component = channel->component(); 179 return true; 180 } 181 182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const { 183 ChannelMap::const_iterator iter = channels_.find(component); 184 return (iter != channels_.end()) ? iter->second : NULL; 185 } 186 187 TransportChannelProxy* TransportProxy::GetChannelProxyByName( 188 const std::string& name) const { 189 for (ChannelMap::const_iterator iter = channels_.begin(); 190 iter != channels_.end(); 191 ++iter) { 192 if (iter->second->name() == name) { 193 return iter->second; 194 } 195 } 196 return NULL; 197 } 198 199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl( 200 int component) { 201 return worker_thread_->Invoke<TransportChannelImpl*>(Bind( 202 &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component)); 203 } 204 205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w( 206 int component) { 207 ASSERT(talk_base::Thread::Current() == worker_thread_); 208 TransportChannelImpl* impl = transport_->get()->GetChannel(component); 209 if (impl == NULL) { 210 impl = transport_->get()->CreateChannel(component); 211 } 212 return impl; 213 } 214 215 void TransportProxy::SetupChannelProxy( 216 int component, TransportChannelProxy* transproxy) { 217 worker_thread_->Invoke<void>(Bind( 218 &TransportProxy::SetupChannelProxy_w, this, component, transproxy)); 219 } 220 221 void TransportProxy::SetupChannelProxy_w( 222 int component, TransportChannelProxy* transproxy) { 223 ASSERT(talk_base::Thread::Current() == worker_thread_); 224 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component); 225 ASSERT(impl != NULL); 226 transproxy->SetImplementation(impl); 227 } 228 229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy, 230 TransportChannelImpl* impl) { 231 worker_thread_->Invoke<void>(Bind( 232 &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl)); 233 } 234 235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy, 236 TransportChannelImpl* impl) { 237 ASSERT(talk_base::Thread::Current() == worker_thread_); 238 ASSERT(proxy != NULL); 239 proxy->SetImplementation(impl); 240 } 241 242 // This function muxes |this| onto |target| by repointing |this| at 243 // |target|'s transport and setting our TransportChannelProxies 244 // to point to |target|'s underlying implementations. 245 bool TransportProxy::SetupMux(TransportProxy* target) { 246 // Bail out if there's nothing to do. 247 if (transport_ == target->transport_) { 248 return true; 249 } 250 251 // Run through all channels and remove any non-rtp transport channels before 252 // setting target transport channels. 253 for (ChannelMap::const_iterator iter = channels_.begin(); 254 iter != channels_.end(); ++iter) { 255 if (!target->transport_->get()->HasChannel(iter->first)) { 256 // Remove if channel doesn't exist in |transport_|. 257 ReplaceChannelProxyImpl(iter->second, NULL); 258 } else { 259 // Replace the impl for all the TransportProxyChannels with the channels 260 // from |target|'s transport. Fail if there's not an exact match. 261 ReplaceChannelProxyImpl( 262 iter->second, target->transport_->get()->CreateChannel(iter->first)); 263 } 264 } 265 266 // Now replace our transport. Must happen afterwards because 267 // it deletes all impls as a side effect. 268 transport_ = target->transport_; 269 transport_->get()->SignalCandidatesReady.connect( 270 this, &TransportProxy::OnTransportCandidatesReady); 271 set_candidates_allocated(target->candidates_allocated()); 272 return true; 273 } 274 275 void TransportProxy::SetIceRole(IceRole role) { 276 transport_->get()->SetIceRole(role); 277 } 278 279 bool TransportProxy::SetLocalTransportDescription( 280 const TransportDescription& description, ContentAction action) { 281 // If this is an answer, finalize the negotiation. 282 if (action == CA_ANSWER) { 283 CompleteNegotiation(); 284 } 285 return transport_->get()->SetLocalTransportDescription(description, action); 286 } 287 288 bool TransportProxy::SetRemoteTransportDescription( 289 const TransportDescription& description, ContentAction action) { 290 // If this is an answer, finalize the negotiation. 291 if (action == CA_ANSWER) { 292 CompleteNegotiation(); 293 } 294 return transport_->get()->SetRemoteTransportDescription(description, action); 295 } 296 297 void TransportProxy::OnSignalingReady() { 298 // If we're starting a new allocation sequence, reset our state. 299 set_candidates_allocated(false); 300 transport_->get()->OnSignalingReady(); 301 } 302 303 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates, 304 std::string* error) { 305 // Ensure the transport is negotiated before handling candidates. 306 // TODO(juberti): Remove this once everybody calls SetLocalTD. 307 CompleteNegotiation(); 308 309 // Verify each candidate before passing down to transport layer. 310 for (Candidates::const_iterator cand = candidates.begin(); 311 cand != candidates.end(); ++cand) { 312 if (!transport_->get()->VerifyCandidate(*cand, error)) 313 return false; 314 if (!HasChannel(cand->component())) { 315 *error = "Candidate has unknown component: " + cand->ToString() + 316 " for content: " + content_name_; 317 return false; 318 } 319 } 320 transport_->get()->OnRemoteCandidates(candidates); 321 return true; 322 } 323 324 void TransportProxy::SetIdentity( 325 talk_base::SSLIdentity* identity) { 326 transport_->get()->SetIdentity(identity); 327 } 328 329 std::string BaseSession::StateToString(State state) { 330 switch (state) { 331 case Session::STATE_INIT: 332 return "STATE_INIT"; 333 case Session::STATE_SENTINITIATE: 334 return "STATE_SENTINITIATE"; 335 case Session::STATE_RECEIVEDINITIATE: 336 return "STATE_RECEIVEDINITIATE"; 337 case Session::STATE_SENTPRACCEPT: 338 return "STATE_SENTPRACCEPT"; 339 case Session::STATE_SENTACCEPT: 340 return "STATE_SENTACCEPT"; 341 case Session::STATE_RECEIVEDPRACCEPT: 342 return "STATE_RECEIVEDPRACCEPT"; 343 case Session::STATE_RECEIVEDACCEPT: 344 return "STATE_RECEIVEDACCEPT"; 345 case Session::STATE_SENTMODIFY: 346 return "STATE_SENTMODIFY"; 347 case Session::STATE_RECEIVEDMODIFY: 348 return "STATE_RECEIVEDMODIFY"; 349 case Session::STATE_SENTREJECT: 350 return "STATE_SENTREJECT"; 351 case Session::STATE_RECEIVEDREJECT: 352 return "STATE_RECEIVEDREJECT"; 353 case Session::STATE_SENTREDIRECT: 354 return "STATE_SENTREDIRECT"; 355 case Session::STATE_SENTTERMINATE: 356 return "STATE_SENTTERMINATE"; 357 case Session::STATE_RECEIVEDTERMINATE: 358 return "STATE_RECEIVEDTERMINATE"; 359 case Session::STATE_INPROGRESS: 360 return "STATE_INPROGRESS"; 361 case Session::STATE_DEINIT: 362 return "STATE_DEINIT"; 363 default: 364 break; 365 } 366 return "STATE_" + talk_base::ToString(state); 367 } 368 369 BaseSession::BaseSession(talk_base::Thread* signaling_thread, 370 talk_base::Thread* worker_thread, 371 PortAllocator* port_allocator, 372 const std::string& sid, 373 const std::string& content_type, 374 bool initiator) 375 : state_(STATE_INIT), 376 error_(ERROR_NONE), 377 signaling_thread_(signaling_thread), 378 worker_thread_(worker_thread), 379 port_allocator_(port_allocator), 380 sid_(sid), 381 content_type_(content_type), 382 transport_type_(NS_GINGLE_P2P), 383 initiator_(initiator), 384 identity_(NULL), 385 local_description_(NULL), 386 remote_description_(NULL), 387 ice_tiebreaker_(talk_base::CreateRandomId64()), 388 role_switch_(false) { 389 ASSERT(signaling_thread->IsCurrent()); 390 } 391 392 BaseSession::~BaseSession() { 393 ASSERT(signaling_thread()->IsCurrent()); 394 395 ASSERT(state_ != STATE_DEINIT); 396 LogState(state_, STATE_DEINIT); 397 state_ = STATE_DEINIT; 398 SignalState(this, state_); 399 400 for (TransportMap::iterator iter = transports_.begin(); 401 iter != transports_.end(); ++iter) { 402 delete iter->second; 403 } 404 405 delete remote_description_; 406 delete local_description_; 407 } 408 409 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) { 410 if (identity_) 411 return false; 412 identity_ = identity; 413 for (TransportMap::iterator iter = transports_.begin(); 414 iter != transports_.end(); ++iter) { 415 iter->second->SetIdentity(identity_); 416 } 417 return true; 418 } 419 420 bool BaseSession::PushdownTransportDescription(ContentSource source, 421 ContentAction action) { 422 if (source == CS_LOCAL) { 423 return PushdownLocalTransportDescription(local_description_, action); 424 } 425 return PushdownRemoteTransportDescription(remote_description_, action); 426 } 427 428 bool BaseSession::PushdownLocalTransportDescription( 429 const SessionDescription* sdesc, 430 ContentAction action) { 431 // Update the Transports with the right information, and trigger them to 432 // start connecting. 433 for (TransportMap::iterator iter = transports_.begin(); 434 iter != transports_.end(); ++iter) { 435 // If no transport info was in this session description, ret == false 436 // and we just skip this one. 437 TransportDescription tdesc; 438 bool ret = GetTransportDescription( 439 sdesc, iter->second->content_name(), &tdesc); 440 if (ret) { 441 if (!iter->second->SetLocalTransportDescription(tdesc, action)) { 442 return false; 443 } 444 445 iter->second->ConnectChannels(); 446 } 447 } 448 449 return true; 450 } 451 452 bool BaseSession::PushdownRemoteTransportDescription( 453 const SessionDescription* sdesc, 454 ContentAction action) { 455 // Update the Transports with the right information. 456 for (TransportMap::iterator iter = transports_.begin(); 457 iter != transports_.end(); ++iter) { 458 TransportDescription tdesc; 459 460 // If no transport info was in this session description, ret == false 461 // and we just skip this one. 462 bool ret = GetTransportDescription( 463 sdesc, iter->second->content_name(), &tdesc); 464 if (ret) { 465 if (!iter->second->SetRemoteTransportDescription(tdesc, action)) { 466 return false; 467 } 468 } 469 } 470 471 return true; 472 } 473 474 TransportChannel* BaseSession::CreateChannel(const std::string& content_name, 475 const std::string& channel_name, 476 int component) { 477 // We create the proxy "on demand" here because we need to support 478 // creating channels at any time, even before we send or receive 479 // initiate messages, which is before we create the transports. 480 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name); 481 return transproxy->CreateChannel(channel_name, component); 482 } 483 484 TransportChannel* BaseSession::GetChannel(const std::string& content_name, 485 int component) { 486 TransportProxy* transproxy = GetTransportProxy(content_name); 487 if (transproxy == NULL) 488 return NULL; 489 else 490 return transproxy->GetChannel(component); 491 } 492 493 void BaseSession::DestroyChannel(const std::string& content_name, 494 int component) { 495 TransportProxy* transproxy = GetTransportProxy(content_name); 496 ASSERT(transproxy != NULL); 497 transproxy->DestroyChannel(component); 498 } 499 500 TransportProxy* BaseSession::GetOrCreateTransportProxy( 501 const std::string& content_name) { 502 TransportProxy* transproxy = GetTransportProxy(content_name); 503 if (transproxy) 504 return transproxy; 505 506 Transport* transport = CreateTransport(content_name); 507 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED); 508 transport->SetIceTiebreaker(ice_tiebreaker_); 509 // TODO: Connect all the Transport signals to TransportProxy 510 // then to the BaseSession. 511 transport->SignalConnecting.connect( 512 this, &BaseSession::OnTransportConnecting); 513 transport->SignalWritableState.connect( 514 this, &BaseSession::OnTransportWritable); 515 transport->SignalRequestSignaling.connect( 516 this, &BaseSession::OnTransportRequestSignaling); 517 transport->SignalTransportError.connect( 518 this, &BaseSession::OnTransportSendError); 519 transport->SignalRouteChange.connect( 520 this, &BaseSession::OnTransportRouteChange); 521 transport->SignalCandidatesAllocationDone.connect( 522 this, &BaseSession::OnTransportCandidatesAllocationDone); 523 transport->SignalRoleConflict.connect( 524 this, &BaseSession::OnRoleConflict); 525 526 transproxy = new TransportProxy(worker_thread_, sid_, content_name, 527 new TransportWrapper(transport)); 528 transproxy->SignalCandidatesReady.connect( 529 this, &BaseSession::OnTransportProxyCandidatesReady); 530 transports_[content_name] = transproxy; 531 532 return transproxy; 533 } 534 535 Transport* BaseSession::GetTransport(const std::string& content_name) { 536 TransportProxy* transproxy = GetTransportProxy(content_name); 537 if (transproxy == NULL) 538 return NULL; 539 return transproxy->impl(); 540 } 541 542 TransportProxy* BaseSession::GetTransportProxy( 543 const std::string& content_name) { 544 TransportMap::iterator iter = transports_.find(content_name); 545 return (iter != transports_.end()) ? iter->second : NULL; 546 } 547 548 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) { 549 for (TransportMap::iterator iter = transports_.begin(); 550 iter != transports_.end(); ++iter) { 551 TransportProxy* transproxy = iter->second; 552 if (transproxy->impl() == transport) { 553 return transproxy; 554 } 555 } 556 return NULL; 557 } 558 559 TransportProxy* BaseSession::GetFirstTransportProxy() { 560 if (transports_.empty()) 561 return NULL; 562 return transports_.begin()->second; 563 } 564 565 void BaseSession::DestroyTransportProxy( 566 const std::string& content_name) { 567 TransportMap::iterator iter = transports_.find(content_name); 568 if (iter != transports_.end()) { 569 delete iter->second; 570 transports_.erase(content_name); 571 } 572 } 573 574 cricket::Transport* BaseSession::CreateTransport( 575 const std::string& content_name) { 576 ASSERT(transport_type_ == NS_GINGLE_P2P); 577 return new cricket::DtlsTransport<P2PTransport>( 578 signaling_thread(), worker_thread(), content_name, 579 port_allocator(), identity_); 580 } 581 582 bool BaseSession::GetStats(SessionStats* stats) { 583 for (TransportMap::iterator iter = transports_.begin(); 584 iter != transports_.end(); ++iter) { 585 std::string proxy_id = iter->second->content_name(); 586 // We are ignoring not-yet-instantiated transports. 587 if (iter->second->impl()) { 588 std::string transport_id = iter->second->impl()->content_name(); 589 stats->proxy_to_transport[proxy_id] = transport_id; 590 if (stats->transport_stats.find(transport_id) 591 == stats->transport_stats.end()) { 592 TransportStats subinfos; 593 if (!iter->second->impl()->GetStats(&subinfos)) { 594 return false; 595 } 596 stats->transport_stats[transport_id] = subinfos; 597 } 598 } 599 } 600 return true; 601 } 602 603 void BaseSession::SetState(State state) { 604 ASSERT(signaling_thread_->IsCurrent()); 605 if (state != state_) { 606 LogState(state_, state); 607 state_ = state; 608 SignalState(this, state_); 609 signaling_thread_->Post(this, MSG_STATE); 610 } 611 SignalNewDescription(); 612 } 613 614 void BaseSession::SetError(Error error) { 615 ASSERT(signaling_thread_->IsCurrent()); 616 if (error != error_) { 617 error_ = error; 618 SignalError(this, error); 619 } 620 } 621 622 void BaseSession::OnSignalingReady() { 623 ASSERT(signaling_thread()->IsCurrent()); 624 for (TransportMap::iterator iter = transports_.begin(); 625 iter != transports_.end(); ++iter) { 626 iter->second->OnSignalingReady(); 627 } 628 } 629 630 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to 631 // start, remove this method once everyone calls PushdownLocalTD. 632 void BaseSession::SpeculativelyConnectAllTransportChannels() { 633 // Put all transports into the connecting state. 634 for (TransportMap::iterator iter = transports_.begin(); 635 iter != transports_.end(); ++iter) { 636 iter->second->ConnectChannels(); 637 } 638 } 639 640 bool BaseSession::OnRemoteCandidates(const std::string& content_name, 641 const Candidates& candidates, 642 std::string* error) { 643 // Give candidates to the appropriate transport, and tell that transport 644 // to start connecting, if it's not already doing so. 645 TransportProxy* transproxy = GetTransportProxy(content_name); 646 if (!transproxy) { 647 *error = "Unknown content name " + content_name; 648 return false; 649 } 650 if (!transproxy->OnRemoteCandidates(candidates, error)) { 651 return false; 652 } 653 // TODO(juberti): Remove this call once we can be sure that we always have 654 // a local transport description (which will trigger the connection). 655 transproxy->ConnectChannels(); 656 return true; 657 } 658 659 bool BaseSession::MaybeEnableMuxingSupport() { 660 // We need both a local and remote description to decide if we should mux. 661 if ((state_ == STATE_SENTINITIATE || 662 state_ == STATE_RECEIVEDINITIATE) && 663 ((local_description_ == NULL) || 664 (remote_description_ == NULL))) { 665 return false; 666 } 667 668 // In order to perform the multiplexing, we need all proxies to be in the 669 // negotiated state, i.e. to have implementations underneath. 670 // Ensure that this is the case, regardless of whether we are going to mux. 671 for (TransportMap::iterator iter = transports_.begin(); 672 iter != transports_.end(); ++iter) { 673 ASSERT(iter->second->negotiated()); 674 if (!iter->second->negotiated()) 675 return false; 676 } 677 678 // If both sides agree to BUNDLE, mux all the specified contents onto the 679 // transport belonging to the first content name in the BUNDLE group. 680 // If the contents are already muxed, this will be a no-op. 681 // TODO(juberti): Should this check that local and remote have configured 682 // BUNDLE the same way? 683 bool candidates_allocated = IsCandidateAllocationDone(); 684 const ContentGroup* local_bundle_group = 685 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 686 const ContentGroup* remote_bundle_group = 687 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 688 if (local_bundle_group && remote_bundle_group && 689 local_bundle_group->FirstContentName()) { 690 const std::string* content_name = local_bundle_group->FirstContentName(); 691 const ContentInfo* content = 692 local_description_->GetContentByName(*content_name); 693 ASSERT(content != NULL); 694 if (!SetSelectedProxy(content->name, local_bundle_group)) { 695 LOG(LS_WARNING) << "Failed to set up BUNDLE"; 696 return false; 697 } 698 699 // If we weren't done gathering before, we might be done now, as a result 700 // of enabling mux. 701 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: " 702 << *content_name; 703 if (!candidates_allocated) { 704 MaybeCandidateAllocationDone(); 705 } 706 } else { 707 LOG(LS_INFO) << "No BUNDLE information, not bundling."; 708 } 709 return true; 710 } 711 712 bool BaseSession::SetSelectedProxy(const std::string& content_name, 713 const ContentGroup* muxed_group) { 714 TransportProxy* selected_proxy = GetTransportProxy(content_name); 715 if (!selected_proxy) { 716 return false; 717 } 718 719 ASSERT(selected_proxy->negotiated()); 720 for (TransportMap::iterator iter = transports_.begin(); 721 iter != transports_.end(); ++iter) { 722 // If content is part of the mux group, then repoint its proxy at the 723 // transport object that we have chosen to mux onto. If the proxy 724 // is already pointing at the right object, it will be a no-op. 725 if (muxed_group->HasContentName(iter->first) && 726 !iter->second->SetupMux(selected_proxy)) { 727 return false; 728 } 729 } 730 return true; 731 } 732 733 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) { 734 // TODO(juberti): This is a clunky way of processing the done signal. Instead, 735 // TransportProxy should receive the done signal directly, set its allocated 736 // flag internally, and then reissue the done signal to Session. 737 // Overall we should make TransportProxy receive *all* the signals from 738 // Transport, since this removes the need to manually iterate over all 739 // the transports, as is needed to make sure signals are handled properly 740 // when BUNDLEing. 741 #if 0 742 ASSERT(!IsCandidateAllocationDone()); 743 #endif 744 for (TransportMap::iterator iter = transports_.begin(); 745 iter != transports_.end(); ++iter) { 746 if (iter->second->impl() == transport) { 747 iter->second->set_candidates_allocated(true); 748 } 749 } 750 MaybeCandidateAllocationDone(); 751 } 752 753 bool BaseSession::IsCandidateAllocationDone() const { 754 for (TransportMap::const_iterator iter = transports_.begin(); 755 iter != transports_.end(); ++iter) { 756 if (!iter->second->candidates_allocated()) 757 return false; 758 } 759 return true; 760 } 761 762 void BaseSession::MaybeCandidateAllocationDone() { 763 if (IsCandidateAllocationDone()) { 764 LOG(LS_INFO) << "Candidate gathering is complete."; 765 OnCandidatesAllocationDone(); 766 } 767 } 768 769 void BaseSession::OnRoleConflict() { 770 if (role_switch_) { 771 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; 772 return; 773 } 774 775 role_switch_ = true; 776 for (TransportMap::iterator iter = transports_.begin(); 777 iter != transports_.end(); ++iter) { 778 // Role will be reverse of initial role setting. 779 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING; 780 iter->second->SetIceRole(role); 781 } 782 } 783 784 void BaseSession::LogState(State old_state, State new_state) { 785 LOG(LS_INFO) << "Session:" << id() 786 << " Old state:" << StateToString(old_state) 787 << " New state:" << StateToString(new_state) 788 << " Type:" << content_type() 789 << " Transport:" << transport_type(); 790 } 791 792 bool BaseSession::GetTransportDescription(const SessionDescription* description, 793 const std::string& content_name, 794 TransportDescription* tdesc) { 795 if (!description || !tdesc) { 796 return false; 797 } 798 const TransportInfo* transport_info = 799 description->GetTransportInfoByName(content_name); 800 if (!transport_info) { 801 return false; 802 } 803 *tdesc = transport_info->description; 804 return true; 805 } 806 807 void BaseSession::SignalNewDescription() { 808 ContentAction action; 809 ContentSource source; 810 if (!GetContentAction(&action, &source)) { 811 return; 812 } 813 if (source == CS_LOCAL) { 814 SignalNewLocalDescription(this, action); 815 } else { 816 SignalNewRemoteDescription(this, action); 817 } 818 } 819 820 bool BaseSession::GetContentAction(ContentAction* action, 821 ContentSource* source) { 822 switch (state_) { 823 // new local description 824 case STATE_SENTINITIATE: 825 *action = CA_OFFER; 826 *source = CS_LOCAL; 827 break; 828 case STATE_SENTPRACCEPT: 829 *action = CA_PRANSWER; 830 *source = CS_LOCAL; 831 break; 832 case STATE_SENTACCEPT: 833 *action = CA_ANSWER; 834 *source = CS_LOCAL; 835 break; 836 // new remote description 837 case STATE_RECEIVEDINITIATE: 838 *action = CA_OFFER; 839 *source = CS_REMOTE; 840 break; 841 case STATE_RECEIVEDPRACCEPT: 842 *action = CA_PRANSWER; 843 *source = CS_REMOTE; 844 break; 845 case STATE_RECEIVEDACCEPT: 846 *action = CA_ANSWER; 847 *source = CS_REMOTE; 848 break; 849 default: 850 return false; 851 } 852 return true; 853 } 854 855 void BaseSession::OnMessage(talk_base::Message *pmsg) { 856 switch (pmsg->message_id) { 857 case MSG_TIMEOUT: 858 // Session timeout has occured. 859 SetError(ERROR_TIME); 860 break; 861 862 case MSG_STATE: 863 switch (state_) { 864 case STATE_SENTACCEPT: 865 case STATE_RECEIVEDACCEPT: 866 SetState(STATE_INPROGRESS); 867 break; 868 869 default: 870 // Explicitly ignoring some states here. 871 break; 872 } 873 break; 874 } 875 } 876 877 Session::Session(SessionManager* session_manager, 878 const std::string& local_name, 879 const std::string& initiator_name, 880 const std::string& sid, 881 const std::string& content_type, 882 SessionClient* client) 883 : BaseSession(session_manager->signaling_thread(), 884 session_manager->worker_thread(), 885 session_manager->port_allocator(), 886 sid, content_type, initiator_name == local_name) { 887 ASSERT(client != NULL); 888 session_manager_ = session_manager; 889 local_name_ = local_name; 890 initiator_name_ = initiator_name; 891 transport_parser_ = new P2PTransportParser(); 892 client_ = client; 893 initiate_acked_ = false; 894 current_protocol_ = PROTOCOL_HYBRID; 895 } 896 897 Session::~Session() { 898 delete transport_parser_; 899 } 900 901 bool Session::Initiate(const std::string &to, 902 const SessionDescription* sdesc) { 903 ASSERT(signaling_thread()->IsCurrent()); 904 SessionError error; 905 906 // Only from STATE_INIT 907 if (state() != STATE_INIT) 908 return false; 909 910 // Setup for signaling. 911 set_remote_name(to); 912 set_local_description(sdesc); 913 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()), 914 &error)) { 915 LOG(LS_ERROR) << "Could not create transports: " << error.text; 916 return false; 917 } 918 919 if (!SendInitiateMessage(sdesc, &error)) { 920 LOG(LS_ERROR) << "Could not send initiate message: " << error.text; 921 return false; 922 } 923 924 // We need to connect transport proxy and impl here so that we can process 925 // the TransportDescriptions. 926 SpeculativelyConnectAllTransportChannels(); 927 928 PushdownTransportDescription(CS_LOCAL, CA_OFFER); 929 SetState(Session::STATE_SENTINITIATE); 930 return true; 931 } 932 933 bool Session::Accept(const SessionDescription* sdesc) { 934 ASSERT(signaling_thread()->IsCurrent()); 935 936 // Only if just received initiate 937 if (state() != STATE_RECEIVEDINITIATE) 938 return false; 939 940 // Setup for signaling. 941 set_local_description(sdesc); 942 943 SessionError error; 944 if (!SendAcceptMessage(sdesc, &error)) { 945 LOG(LS_ERROR) << "Could not send accept message: " << error.text; 946 return false; 947 } 948 // TODO(juberti): Add BUNDLE support to transport-info messages. 949 PushdownTransportDescription(CS_LOCAL, CA_ANSWER); 950 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 951 SetState(Session::STATE_SENTACCEPT); 952 return true; 953 } 954 955 bool Session::Reject(const std::string& reason) { 956 ASSERT(signaling_thread()->IsCurrent()); 957 958 // Reject is sent in response to an initiate or modify, to reject the 959 // request 960 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY) 961 return false; 962 963 SessionError error; 964 if (!SendRejectMessage(reason, &error)) { 965 LOG(LS_ERROR) << "Could not send reject message: " << error.text; 966 return false; 967 } 968 969 SetState(STATE_SENTREJECT); 970 return true; 971 } 972 973 bool Session::TerminateWithReason(const std::string& reason) { 974 ASSERT(signaling_thread()->IsCurrent()); 975 976 // Either side can terminate, at any time. 977 switch (state()) { 978 case STATE_SENTTERMINATE: 979 case STATE_RECEIVEDTERMINATE: 980 return false; 981 982 case STATE_SENTREJECT: 983 case STATE_RECEIVEDREJECT: 984 // We don't need to send terminate if we sent or received a reject... 985 // it's implicit. 986 break; 987 988 default: 989 SessionError error; 990 if (!SendTerminateMessage(reason, &error)) { 991 LOG(LS_ERROR) << "Could not send terminate message: " << error.text; 992 return false; 993 } 994 break; 995 } 996 997 SetState(STATE_SENTTERMINATE); 998 return true; 999 } 1000 1001 bool Session::SendInfoMessage(const XmlElements& elems, 1002 const std::string& remote_name) { 1003 ASSERT(signaling_thread()->IsCurrent()); 1004 SessionError error; 1005 if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) { 1006 LOG(LS_ERROR) << "Could not send info message " << error.text; 1007 return false; 1008 } 1009 return true; 1010 } 1011 1012 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) { 1013 XmlElements elems; 1014 WriteError write_error; 1015 if (!WriteDescriptionInfo(current_protocol_, 1016 contents, 1017 GetContentParsers(), 1018 &elems, &write_error)) { 1019 LOG(LS_ERROR) << "Could not write description info message: " 1020 << write_error.text; 1021 return false; 1022 } 1023 SessionError error; 1024 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) { 1025 LOG(LS_ERROR) << "Could not send description info message: " 1026 << error.text; 1027 return false; 1028 } 1029 return true; 1030 } 1031 1032 TransportInfos Session::GetEmptyTransportInfos( 1033 const ContentInfos& contents) const { 1034 TransportInfos tinfos; 1035 for (ContentInfos::const_iterator content = contents.begin(); 1036 content != contents.end(); ++content) { 1037 tinfos.push_back(TransportInfo(content->name, 1038 TransportDescription(transport_type(), 1039 std::string(), 1040 std::string()))); 1041 } 1042 return tinfos; 1043 } 1044 1045 bool Session::OnRemoteCandidates( 1046 const TransportInfos& tinfos, ParseError* error) { 1047 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1048 tinfo != tinfos.end(); ++tinfo) { 1049 std::string str_error; 1050 if (!BaseSession::OnRemoteCandidates( 1051 tinfo->content_name, tinfo->description.candidates, &str_error)) { 1052 return BadParse(str_error, error); 1053 } 1054 } 1055 return true; 1056 } 1057 1058 bool Session::CreateTransportProxies(const TransportInfos& tinfos, 1059 SessionError* error) { 1060 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1061 tinfo != tinfos.end(); ++tinfo) { 1062 if (tinfo->description.transport_type != transport_type()) { 1063 error->SetText("No supported transport in offer."); 1064 return false; 1065 } 1066 1067 GetOrCreateTransportProxy(tinfo->content_name); 1068 } 1069 return true; 1070 } 1071 1072 TransportParserMap Session::GetTransportParsers() { 1073 TransportParserMap parsers; 1074 parsers[transport_type()] = transport_parser_; 1075 return parsers; 1076 } 1077 1078 CandidateTranslatorMap Session::GetCandidateTranslators() { 1079 CandidateTranslatorMap translators; 1080 // NOTE: This technique makes it impossible to parse G-ICE 1081 // candidates in session-initiate messages because the channels 1082 // aren't yet created at that point. Since we don't use candidates 1083 // in session-initiate messages, we should be OK. Once we switch to 1084 // ICE, this translation shouldn't be necessary. 1085 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1086 iter != transport_proxies().end(); ++iter) { 1087 translators[iter->first] = iter->second; 1088 } 1089 return translators; 1090 } 1091 1092 ContentParserMap Session::GetContentParsers() { 1093 ContentParserMap parsers; 1094 parsers[content_type()] = client_; 1095 // We need to be able parse both RTP-based and SCTP-based Jingle 1096 // with the same client. 1097 if (content_type() == NS_JINGLE_RTP) { 1098 parsers[NS_JINGLE_DRAFT_SCTP] = client_; 1099 } 1100 return parsers; 1101 } 1102 1103 void Session::OnTransportRequestSignaling(Transport* transport) { 1104 ASSERT(signaling_thread()->IsCurrent()); 1105 TransportProxy* transproxy = GetTransportProxy(transport); 1106 ASSERT(transproxy != NULL); 1107 if (transproxy) { 1108 // Reset candidate allocation status for the transport proxy. 1109 transproxy->set_candidates_allocated(false); 1110 } 1111 SignalRequestSignaling(this); 1112 } 1113 1114 void Session::OnTransportConnecting(Transport* transport) { 1115 // This is an indication that we should begin watching the writability 1116 // state of the transport. 1117 OnTransportWritable(transport); 1118 } 1119 1120 void Session::OnTransportWritable(Transport* transport) { 1121 ASSERT(signaling_thread()->IsCurrent()); 1122 1123 // If the transport is not writable, start a timer to make sure that it 1124 // becomes writable within a reasonable amount of time. If it does not, we 1125 // terminate since we can't actually send data. If the transport is writable, 1126 // cancel the timer. Note that writability transitions may occur repeatedly 1127 // during the lifetime of the session. 1128 signaling_thread()->Clear(this, MSG_TIMEOUT); 1129 if (transport->HasChannels() && !transport->writable()) { 1130 signaling_thread()->PostDelayed( 1131 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); 1132 } 1133 } 1134 1135 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy, 1136 const Candidates& candidates) { 1137 ASSERT(signaling_thread()->IsCurrent()); 1138 if (transproxy != NULL) { 1139 if (initiator() && !initiate_acked_) { 1140 // TODO: This is to work around server re-ordering 1141 // messages. We send the candidates once the session-initiate 1142 // is acked. Once we have fixed the server to guarantee message 1143 // order, we can remove this case. 1144 transproxy->AddUnsentCandidates(candidates); 1145 } else { 1146 if (!transproxy->negotiated()) { 1147 transproxy->AddSentCandidates(candidates); 1148 } 1149 SessionError error; 1150 if (!SendTransportInfoMessage(transproxy, candidates, &error)) { 1151 LOG(LS_ERROR) << "Could not send transport info message: " 1152 << error.text; 1153 return; 1154 } 1155 } 1156 } 1157 } 1158 1159 void Session::OnTransportSendError(Transport* transport, 1160 const buzz::XmlElement* stanza, 1161 const buzz::QName& name, 1162 const std::string& type, 1163 const std::string& text, 1164 const buzz::XmlElement* extra_info) { 1165 ASSERT(signaling_thread()->IsCurrent()); 1166 SignalErrorMessage(this, stanza, name, type, text, extra_info); 1167 } 1168 1169 void Session::OnIncomingMessage(const SessionMessage& msg) { 1170 ASSERT(signaling_thread()->IsCurrent()); 1171 ASSERT(state() == STATE_INIT || msg.from == remote_name()); 1172 1173 if (current_protocol_== PROTOCOL_HYBRID) { 1174 if (msg.protocol == PROTOCOL_GINGLE) { 1175 current_protocol_ = PROTOCOL_GINGLE; 1176 } else { 1177 current_protocol_ = PROTOCOL_JINGLE; 1178 } 1179 } 1180 1181 bool valid = false; 1182 MessageError error; 1183 switch (msg.type) { 1184 case ACTION_SESSION_INITIATE: 1185 valid = OnInitiateMessage(msg, &error); 1186 break; 1187 case ACTION_SESSION_INFO: 1188 valid = OnInfoMessage(msg); 1189 break; 1190 case ACTION_SESSION_ACCEPT: 1191 valid = OnAcceptMessage(msg, &error); 1192 break; 1193 case ACTION_SESSION_REJECT: 1194 valid = OnRejectMessage(msg, &error); 1195 break; 1196 case ACTION_SESSION_TERMINATE: 1197 valid = OnTerminateMessage(msg, &error); 1198 break; 1199 case ACTION_TRANSPORT_INFO: 1200 valid = OnTransportInfoMessage(msg, &error); 1201 break; 1202 case ACTION_TRANSPORT_ACCEPT: 1203 valid = OnTransportAcceptMessage(msg, &error); 1204 break; 1205 case ACTION_DESCRIPTION_INFO: 1206 valid = OnDescriptionInfoMessage(msg, &error); 1207 break; 1208 default: 1209 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST, 1210 "unknown session message type", 1211 &error); 1212 } 1213 1214 if (valid) { 1215 SendAcknowledgementMessage(msg.stanza); 1216 } else { 1217 SignalErrorMessage(this, msg.stanza, error.type, 1218 "modify", error.text, NULL); 1219 } 1220 } 1221 1222 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza, 1223 const buzz::XmlElement* response_stanza, 1224 const SessionMessage& msg) { 1225 ASSERT(signaling_thread()->IsCurrent()); 1226 1227 if (msg.type == ACTION_SESSION_INITIATE) { 1228 OnInitiateAcked(); 1229 } 1230 } 1231 1232 void Session::OnInitiateAcked() { 1233 // TODO: This is to work around server re-ordering 1234 // messages. We send the candidates once the session-initiate 1235 // is acked. Once we have fixed the server to guarantee message 1236 // order, we can remove this case. 1237 if (!initiate_acked_) { 1238 initiate_acked_ = true; 1239 SessionError error; 1240 SendAllUnsentTransportInfoMessages(&error); 1241 } 1242 } 1243 1244 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza, 1245 const buzz::XmlElement* error_stanza) { 1246 ASSERT(signaling_thread()->IsCurrent()); 1247 1248 SessionMessage msg; 1249 ParseError parse_error; 1250 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) { 1251 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text 1252 << ":" << orig_stanza; 1253 return; 1254 } 1255 1256 // If the error is a session redirect, call OnRedirectError, which will 1257 // continue the session with a new remote JID. 1258 SessionRedirect redirect; 1259 if (FindSessionRedirect(error_stanza, &redirect)) { 1260 SessionError error; 1261 if (!OnRedirectError(redirect, &error)) { 1262 // TODO: Should we send a message back? The standard 1263 // says nothing about it. 1264 LOG(LS_ERROR) << "Failed to redirect: " << error.text; 1265 SetError(ERROR_RESPONSE); 1266 } 1267 return; 1268 } 1269 1270 std::string error_type = "cancel"; 1271 1272 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR); 1273 if (error) { 1274 error_type = error->Attr(buzz::QN_TYPE); 1275 1276 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n" 1277 << "in response to:\n" << orig_stanza->Str(); 1278 } else { 1279 // don't crash if <error> is missing 1280 LOG(LS_ERROR) << "Session error without <error/> element, ignoring"; 1281 return; 1282 } 1283 1284 if (msg.type == ACTION_TRANSPORT_INFO) { 1285 // Transport messages frequently generate errors because they are sent right 1286 // when we detect a network failure. For that reason, we ignore such 1287 // errors, because if we do not establish writability again, we will 1288 // terminate anyway. The exceptions are transport-specific error tags, 1289 // which we pass on to the respective transport. 1290 } else if ((error_type != "continue") && (error_type != "wait")) { 1291 // We do not set an error if the other side said it is okay to continue 1292 // (possibly after waiting). These errors can be ignored. 1293 SetError(ERROR_RESPONSE); 1294 } 1295 } 1296 1297 bool Session::OnInitiateMessage(const SessionMessage& msg, 1298 MessageError* error) { 1299 if (!CheckState(STATE_INIT, error)) 1300 return false; 1301 1302 SessionInitiate init; 1303 if (!ParseSessionInitiate(msg.protocol, msg.action_elem, 1304 GetContentParsers(), GetTransportParsers(), 1305 GetCandidateTranslators(), 1306 &init, error)) 1307 return false; 1308 1309 SessionError session_error; 1310 if (!CreateTransportProxies(init.transports, &session_error)) { 1311 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE, 1312 session_error.text, error); 1313 } 1314 1315 set_remote_name(msg.from); 1316 set_initiator_name(msg.initiator); 1317 set_remote_description(new SessionDescription(init.ClearContents(), 1318 init.transports, 1319 init.groups)); 1320 // Updating transport with TransportDescription. 1321 PushdownTransportDescription(CS_REMOTE, CA_OFFER); 1322 SetState(STATE_RECEIVEDINITIATE); 1323 1324 // Users of Session may listen to state change and call Reject(). 1325 if (state() != STATE_SENTREJECT) { 1326 if (!OnRemoteCandidates(init.transports, error)) 1327 return false; 1328 1329 // TODO(juberti): Auto-generate and push down the local transport answer. 1330 // This is necessary for trickling to work with RFC 5245 ICE. 1331 } 1332 return true; 1333 } 1334 1335 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) { 1336 if (!CheckState(STATE_SENTINITIATE, error)) 1337 return false; 1338 1339 SessionAccept accept; 1340 if (!ParseSessionAccept(msg.protocol, msg.action_elem, 1341 GetContentParsers(), GetTransportParsers(), 1342 GetCandidateTranslators(), 1343 &accept, error)) { 1344 return false; 1345 } 1346 1347 // If we get an accept, we can assume the initiate has been 1348 // received, even if we haven't gotten an IQ response. 1349 OnInitiateAcked(); 1350 1351 set_remote_description(new SessionDescription(accept.ClearContents(), 1352 accept.transports, 1353 accept.groups)); 1354 // Updating transport with TransportDescription. 1355 PushdownTransportDescription(CS_REMOTE, CA_ANSWER); 1356 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 1357 SetState(STATE_RECEIVEDACCEPT); 1358 1359 if (!OnRemoteCandidates(accept.transports, error)) 1360 return false; 1361 1362 return true; 1363 } 1364 1365 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) { 1366 if (!CheckState(STATE_SENTINITIATE, error)) 1367 return false; 1368 1369 SetState(STATE_RECEIVEDREJECT); 1370 return true; 1371 } 1372 1373 bool Session::OnInfoMessage(const SessionMessage& msg) { 1374 SignalInfoMessage(this, msg.action_elem); 1375 return true; 1376 } 1377 1378 bool Session::OnTerminateMessage(const SessionMessage& msg, 1379 MessageError* error) { 1380 SessionTerminate term; 1381 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error)) 1382 return false; 1383 1384 SignalReceivedTerminateReason(this, term.reason); 1385 if (term.debug_reason != buzz::STR_EMPTY) { 1386 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason; 1387 } 1388 1389 SetState(STATE_RECEIVEDTERMINATE); 1390 return true; 1391 } 1392 1393 bool Session::OnTransportInfoMessage(const SessionMessage& msg, 1394 MessageError* error) { 1395 TransportInfos tinfos; 1396 if (!ParseTransportInfos(msg.protocol, msg.action_elem, 1397 initiator_description()->contents(), 1398 GetTransportParsers(), GetCandidateTranslators(), 1399 &tinfos, error)) 1400 return false; 1401 1402 if (!OnRemoteCandidates(tinfos, error)) 1403 return false; 1404 1405 return true; 1406 } 1407 1408 bool Session::OnTransportAcceptMessage(const SessionMessage& msg, 1409 MessageError* error) { 1410 // TODO: Currently here only for compatibility with 1411 // Gingle 1.1 clients (notably, Google Voice). 1412 return true; 1413 } 1414 1415 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg, 1416 MessageError* error) { 1417 if (!CheckState(STATE_INPROGRESS, error)) 1418 return false; 1419 1420 DescriptionInfo description_info; 1421 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem, 1422 GetContentParsers(), GetTransportParsers(), 1423 GetCandidateTranslators(), 1424 &description_info, error)) { 1425 return false; 1426 } 1427 1428 ContentInfos& updated_contents = description_info.contents; 1429 1430 // TODO: Currently, reflector sends back 1431 // video stream updates even for an audio-only call, which causes 1432 // this to fail. Put this back once reflector is fixed. 1433 // 1434 // ContentInfos::iterator it; 1435 // First, ensure all updates are valid before modifying remote_description_. 1436 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1437 // if (remote_description()->GetContentByName(it->name) == NULL) { 1438 // return false; 1439 // } 1440 // } 1441 1442 // TODO: We used to replace contents from an update, but 1443 // that no longer works with partial updates. We need to figure out 1444 // a way to merge patial updates into contents. For now, users of 1445 // Session should listen to SignalRemoteDescriptionUpdate and handle 1446 // updates. They should not expect remote_description to be the 1447 // latest value. 1448 // 1449 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1450 // remote_description()->RemoveContentByName(it->name); 1451 // remote_description()->AddContent(it->name, it->type, it->description); 1452 // } 1453 // } 1454 1455 SignalRemoteDescriptionUpdate(this, updated_contents); 1456 return true; 1457 } 1458 1459 bool BareJidsEqual(const std::string& name1, 1460 const std::string& name2) { 1461 buzz::Jid jid1(name1); 1462 buzz::Jid jid2(name2); 1463 1464 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2); 1465 } 1466 1467 bool Session::OnRedirectError(const SessionRedirect& redirect, 1468 SessionError* error) { 1469 MessageError message_error; 1470 if (!CheckState(STATE_SENTINITIATE, &message_error)) { 1471 return BadWrite(message_error.text, error); 1472 } 1473 1474 if (!BareJidsEqual(remote_name(), redirect.target)) 1475 return BadWrite("Redirection not allowed: must be the same bare jid.", 1476 error); 1477 1478 // When we receive a redirect, we point the session at the new JID 1479 // and resend the candidates. 1480 set_remote_name(redirect.target); 1481 return (SendInitiateMessage(local_description(), error) && 1482 ResendAllTransportInfoMessages(error)); 1483 } 1484 1485 bool Session::CheckState(State expected, MessageError* error) { 1486 if (state() != expected) { 1487 // The server can deliver messages out of order/repeated for various 1488 // reasons. For example, if the server does not recive our iq response, 1489 // it could assume that the iq it sent was lost, and will then send 1490 // it again. Ideally, we should implement reliable messaging with 1491 // duplicate elimination. 1492 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED, 1493 "message not allowed in current state", 1494 error); 1495 } 1496 return true; 1497 } 1498 1499 void Session::SetError(Error error) { 1500 BaseSession::SetError(error); 1501 if (error != ERROR_NONE) 1502 signaling_thread()->Post(this, MSG_ERROR); 1503 } 1504 1505 void Session::OnMessage(talk_base::Message* pmsg) { 1506 // preserve this because BaseSession::OnMessage may modify it 1507 State orig_state = state(); 1508 1509 BaseSession::OnMessage(pmsg); 1510 1511 switch (pmsg->message_id) { 1512 case MSG_ERROR: 1513 TerminateWithReason(STR_TERMINATE_ERROR); 1514 break; 1515 1516 case MSG_STATE: 1517 switch (orig_state) { 1518 case STATE_SENTREJECT: 1519 case STATE_RECEIVEDREJECT: 1520 // Assume clean termination. 1521 Terminate(); 1522 break; 1523 1524 case STATE_SENTTERMINATE: 1525 case STATE_RECEIVEDTERMINATE: 1526 session_manager_->DestroySession(this); 1527 break; 1528 1529 default: 1530 // Explicitly ignoring some states here. 1531 break; 1532 } 1533 break; 1534 } 1535 } 1536 1537 bool Session::SendInitiateMessage(const SessionDescription* sdesc, 1538 SessionError* error) { 1539 SessionInitiate init; 1540 init.contents = sdesc->contents(); 1541 init.transports = GetEmptyTransportInfos(init.contents); 1542 init.groups = sdesc->groups(); 1543 return SendMessage(ACTION_SESSION_INITIATE, init, error); 1544 } 1545 1546 bool Session::WriteSessionAction( 1547 SignalingProtocol protocol, const SessionInitiate& init, 1548 XmlElements* elems, WriteError* error) { 1549 return WriteSessionInitiate(protocol, init.contents, init.transports, 1550 GetContentParsers(), GetTransportParsers(), 1551 GetCandidateTranslators(), init.groups, 1552 elems, error); 1553 } 1554 1555 bool Session::SendAcceptMessage(const SessionDescription* sdesc, 1556 SessionError* error) { 1557 XmlElements elems; 1558 if (!WriteSessionAccept(current_protocol_, 1559 sdesc->contents(), 1560 GetEmptyTransportInfos(sdesc->contents()), 1561 GetContentParsers(), GetTransportParsers(), 1562 GetCandidateTranslators(), sdesc->groups(), 1563 &elems, error)) { 1564 return false; 1565 } 1566 return SendMessage(ACTION_SESSION_ACCEPT, elems, error); 1567 } 1568 1569 bool Session::SendRejectMessage(const std::string& reason, 1570 SessionError* error) { 1571 SessionTerminate term(reason); 1572 return SendMessage(ACTION_SESSION_REJECT, term, error); 1573 } 1574 1575 bool Session::SendTerminateMessage(const std::string& reason, 1576 SessionError* error) { 1577 SessionTerminate term(reason); 1578 return SendMessage(ACTION_SESSION_TERMINATE, term, error); 1579 } 1580 1581 bool Session::WriteSessionAction(SignalingProtocol protocol, 1582 const SessionTerminate& term, 1583 XmlElements* elems, WriteError* error) { 1584 WriteSessionTerminate(protocol, term, elems); 1585 return true; 1586 } 1587 1588 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo, 1589 SessionError* error) { 1590 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error); 1591 } 1592 1593 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy, 1594 const Candidates& candidates, 1595 SessionError* error) { 1596 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(), 1597 TransportDescription(transproxy->type(), std::vector<std::string>(), 1598 std::string(), std::string(), ICEMODE_FULL, 1599 CONNECTIONROLE_NONE, NULL, candidates)), error); 1600 } 1601 1602 bool Session::WriteSessionAction(SignalingProtocol protocol, 1603 const TransportInfo& tinfo, 1604 XmlElements* elems, WriteError* error) { 1605 TransportInfos tinfos; 1606 tinfos.push_back(tinfo); 1607 return WriteTransportInfos(protocol, tinfos, 1608 GetTransportParsers(), GetCandidateTranslators(), 1609 elems, error); 1610 } 1611 1612 bool Session::ResendAllTransportInfoMessages(SessionError* error) { 1613 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1614 iter != transport_proxies().end(); ++iter) { 1615 TransportProxy* transproxy = iter->second; 1616 if (transproxy->sent_candidates().size() > 0) { 1617 if (!SendTransportInfoMessage( 1618 transproxy, transproxy->sent_candidates(), error)) { 1619 LOG(LS_ERROR) << "Could not resend transport info messages: " 1620 << error->text; 1621 return false; 1622 } 1623 transproxy->ClearSentCandidates(); 1624 } 1625 } 1626 return true; 1627 } 1628 1629 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) { 1630 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1631 iter != transport_proxies().end(); ++iter) { 1632 TransportProxy* transproxy = iter->second; 1633 if (transproxy->unsent_candidates().size() > 0) { 1634 if (!SendTransportInfoMessage( 1635 transproxy, transproxy->unsent_candidates(), error)) { 1636 LOG(LS_ERROR) << "Could not send unsent transport info messages: " 1637 << error->text; 1638 return false; 1639 } 1640 transproxy->ClearUnsentCandidates(); 1641 } 1642 } 1643 return true; 1644 } 1645 1646 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1647 SessionError* error) { 1648 return SendMessage(type, action_elems, remote_name(), error); 1649 } 1650 1651 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1652 const std::string& remote_name, SessionError* error) { 1653 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1654 new buzz::XmlElement(buzz::QN_IQ)); 1655 1656 SessionMessage msg(current_protocol_, type, id(), initiator_name()); 1657 msg.to = remote_name; 1658 WriteSessionMessage(msg, action_elems, stanza.get()); 1659 1660 SignalOutgoingMessage(this, stanza.get()); 1661 return true; 1662 } 1663 1664 template <typename Action> 1665 bool Session::SendMessage(ActionType type, const Action& action, 1666 SessionError* error) { 1667 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1668 new buzz::XmlElement(buzz::QN_IQ)); 1669 if (!WriteActionMessage(type, action, stanza.get(), error)) 1670 return false; 1671 1672 SignalOutgoingMessage(this, stanza.get()); 1673 return true; 1674 } 1675 1676 template <typename Action> 1677 bool Session::WriteActionMessage(ActionType type, const Action& action, 1678 buzz::XmlElement* stanza, 1679 WriteError* error) { 1680 if (current_protocol_ == PROTOCOL_HYBRID) { 1681 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error)) 1682 return false; 1683 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error)) 1684 return false; 1685 } else { 1686 if (!WriteActionMessage(current_protocol_, type, action, stanza, error)) 1687 return false; 1688 } 1689 return true; 1690 } 1691 1692 template <typename Action> 1693 bool Session::WriteActionMessage(SignalingProtocol protocol, 1694 ActionType type, const Action& action, 1695 buzz::XmlElement* stanza, WriteError* error) { 1696 XmlElements action_elems; 1697 if (!WriteSessionAction(protocol, action, &action_elems, error)) 1698 return false; 1699 1700 SessionMessage msg(protocol, type, id(), initiator_name()); 1701 msg.to = remote_name(); 1702 1703 WriteSessionMessage(msg, action_elems, stanza); 1704 return true; 1705 } 1706 1707 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) { 1708 talk_base::scoped_ptr<buzz::XmlElement> ack( 1709 new buzz::XmlElement(buzz::QN_IQ)); 1710 ack->SetAttr(buzz::QN_TO, remote_name()); 1711 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); 1712 ack->SetAttr(buzz::QN_TYPE, "result"); 1713 1714 SignalOutgoingMessage(this, ack.get()); 1715 } 1716 1717 } // namespace cricket 1718