1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/p2p/base/session.h" 29 #include "talk/base/common.h" 30 #include "talk/base/logging.h" 31 #include "talk/base/helpers.h" 32 #include "talk/base/scoped_ptr.h" 33 #include "talk/base/sslstreamadapter.h" 34 #include "talk/xmpp/constants.h" 35 #include "talk/xmpp/jid.h" 36 #include "talk/p2p/base/dtlstransport.h" 37 #include "talk/p2p/base/p2ptransport.h" 38 #include "talk/p2p/base/sessionclient.h" 39 #include "talk/p2p/base/transport.h" 40 #include "talk/p2p/base/transportchannelproxy.h" 41 #include "talk/p2p/base/transportinfo.h" 42 43 #include "talk/p2p/base/constants.h" 44 45 namespace cricket { 46 47 bool BadMessage(const buzz::QName type, 48 const std::string& text, 49 MessageError* err) { 50 err->SetType(type); 51 err->SetText(text); 52 return false; 53 } 54 55 TransportProxy::~TransportProxy() { 56 for (ChannelMap::iterator iter = channels_.begin(); 57 iter != channels_.end(); ++iter) { 58 iter->second->SignalDestroyed(iter->second); 59 delete iter->second; 60 } 61 } 62 63 std::string TransportProxy::type() const { 64 return transport_->get()->type(); 65 } 66 67 TransportChannel* TransportProxy::GetChannel(int component) { 68 return GetChannelProxy(component); 69 } 70 71 TransportChannel* TransportProxy::CreateChannel( 72 const std::string& name, int component) { 73 ASSERT(GetChannel(component) == NULL); 74 ASSERT(!transport_->get()->HasChannel(component)); 75 76 // We always create a proxy in case we need to change out the transport later. 77 TransportChannelProxy* channel = 78 new TransportChannelProxy(content_name(), name, component); 79 channels_[component] = channel; 80 81 // If we're already negotiated, create an impl and hook it up to the proxy 82 // channel. If we're connecting, create an impl but don't hook it up yet. 83 if (negotiated_) { 84 SetChannelProxyImpl(component, channel); 85 } else if (connecting_) { 86 GetOrCreateChannelProxyImpl(component); 87 } 88 return channel; 89 } 90 91 bool TransportProxy::HasChannel(int component) { 92 return transport_->get()->HasChannel(component); 93 } 94 95 void TransportProxy::DestroyChannel(int component) { 96 TransportChannel* channel = GetChannel(component); 97 if (channel) { 98 // If the state of TransportProxy is not NEGOTIATED 99 // then TransportChannelProxy and its impl are not 100 // connected. Both must be connected before 101 // deletion. 102 if (!negotiated_) { 103 SetChannelProxyImpl(component, GetChannelProxy(component)); 104 } 105 106 channels_.erase(component); 107 channel->SignalDestroyed(channel); 108 delete channel; 109 } 110 } 111 112 void TransportProxy::ConnectChannels() { 113 if (!connecting_) { 114 if (!negotiated_) { 115 for (ChannelMap::iterator iter = channels_.begin(); 116 iter != channels_.end(); ++iter) { 117 GetOrCreateChannelProxyImpl(iter->first); 118 } 119 } 120 connecting_ = true; 121 } 122 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we 123 // don't have any channels yet, so we need to allow this method to be called 124 // multiple times. Once we fix Transport, we can move this call inside the 125 // if (!connecting_) block. 126 transport_->get()->ConnectChannels(); 127 } 128 129 void TransportProxy::CompleteNegotiation() { 130 if (!negotiated_) { 131 for (ChannelMap::iterator iter = channels_.begin(); 132 iter != channels_.end(); ++iter) { 133 SetChannelProxyImpl(iter->first, iter->second); 134 } 135 negotiated_ = true; 136 } 137 } 138 139 void TransportProxy::AddSentCandidates(const Candidates& candidates) { 140 for (Candidates::const_iterator cand = candidates.begin(); 141 cand != candidates.end(); ++cand) { 142 sent_candidates_.push_back(*cand); 143 } 144 } 145 146 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) { 147 for (Candidates::const_iterator cand = candidates.begin(); 148 cand != candidates.end(); ++cand) { 149 unsent_candidates_.push_back(*cand); 150 } 151 } 152 153 bool TransportProxy::GetChannelNameFromComponent( 154 int component, std::string* channel_name) const { 155 const TransportChannelProxy* channel = GetChannelProxy(component); 156 if (channel == NULL) { 157 return false; 158 } 159 160 *channel_name = channel->name(); 161 return true; 162 } 163 164 bool TransportProxy::GetComponentFromChannelName( 165 const std::string& channel_name, int* component) const { 166 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name); 167 if (channel == NULL) { 168 return false; 169 } 170 171 *component = channel->component(); 172 return true; 173 } 174 175 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const { 176 ChannelMap::const_iterator iter = channels_.find(component); 177 return (iter != channels_.end()) ? iter->second : NULL; 178 } 179 180 TransportChannelProxy* TransportProxy::GetChannelProxyByName( 181 const std::string& name) const { 182 for (ChannelMap::const_iterator iter = channels_.begin(); 183 iter != channels_.end(); 184 ++iter) { 185 if (iter->second->name() == name) { 186 return iter->second; 187 } 188 } 189 return NULL; 190 } 191 192 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl( 193 int component) { 194 TransportChannelImpl* impl = transport_->get()->GetChannel(component); 195 if (impl == NULL) { 196 impl = transport_->get()->CreateChannel(component); 197 } 198 return impl; 199 } 200 201 void TransportProxy::SetChannelProxyImpl( 202 int component, TransportChannelProxy* transproxy) { 203 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component); 204 ASSERT(impl != NULL); 205 transproxy->SetImplementation(impl); 206 } 207 208 // This function muxes |this| onto |target| by repointing |this| at 209 // |target|'s transport and setting our TransportChannelProxies 210 // to point to |target|'s underlying implementations. 211 bool TransportProxy::SetupMux(TransportProxy* target) { 212 // Bail out if there's nothing to do. 213 if (transport_ == target->transport_) { 214 return true; 215 } 216 217 // Run through all channels and remove any non-rtp transport channels before 218 // setting target transport channels. 219 for (ChannelMap::const_iterator iter = channels_.begin(); 220 iter != channels_.end(); ++iter) { 221 if (!target->transport_->get()->HasChannel(iter->first)) { 222 // Remove if channel doesn't exist in |transport_|. 223 iter->second->SetImplementation(NULL); 224 } else { 225 // Replace the impl for all the TransportProxyChannels with the channels 226 // from |target|'s transport. Fail if there's not an exact match. 227 iter->second->SetImplementation( 228 target->transport_->get()->CreateChannel(iter->first)); 229 } 230 } 231 232 // Now replace our transport. Must happen afterwards because 233 // it deletes all impls as a side effect. 234 transport_ = target->transport_; 235 transport_->get()->SignalCandidatesReady.connect( 236 this, &TransportProxy::OnTransportCandidatesReady); 237 set_candidates_allocated(target->candidates_allocated()); 238 return true; 239 } 240 241 void TransportProxy::SetIceRole(IceRole role) { 242 transport_->get()->SetIceRole(role); 243 } 244 245 bool TransportProxy::SetLocalTransportDescription( 246 const TransportDescription& description, ContentAction action) { 247 // If this is an answer, finalize the negotiation. 248 if (action == CA_ANSWER) { 249 CompleteNegotiation(); 250 } 251 return transport_->get()->SetLocalTransportDescription(description, action); 252 } 253 254 bool TransportProxy::SetRemoteTransportDescription( 255 const TransportDescription& description, ContentAction action) { 256 // If this is an answer, finalize the negotiation. 257 if (action == CA_ANSWER) { 258 CompleteNegotiation(); 259 } 260 return transport_->get()->SetRemoteTransportDescription(description, action); 261 } 262 263 void TransportProxy::OnSignalingReady() { 264 // If we're starting a new allocation sequence, reset our state. 265 set_candidates_allocated(false); 266 transport_->get()->OnSignalingReady(); 267 } 268 269 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates, 270 std::string* error) { 271 // Ensure the transport is negotiated before handling candidates. 272 // TODO(juberti): Remove this once everybody calls SetLocalTD. 273 CompleteNegotiation(); 274 275 // Verify each candidate before passing down to transport layer. 276 for (Candidates::const_iterator cand = candidates.begin(); 277 cand != candidates.end(); ++cand) { 278 if (!transport_->get()->VerifyCandidate(*cand, error)) 279 return false; 280 if (!HasChannel(cand->component())) { 281 *error = "Candidate has unknown component: " + cand->ToString() + 282 " for content: " + content_name_; 283 return false; 284 } 285 } 286 transport_->get()->OnRemoteCandidates(candidates); 287 return true; 288 } 289 290 void TransportProxy::SetIdentity( 291 talk_base::SSLIdentity* identity) { 292 transport_->get()->SetIdentity(identity); 293 } 294 295 std::string BaseSession::StateToString(State state) { 296 switch (state) { 297 case Session::STATE_INIT: 298 return "STATE_INIT"; 299 case Session::STATE_SENTINITIATE: 300 return "STATE_SENTINITIATE"; 301 case Session::STATE_RECEIVEDINITIATE: 302 return "STATE_RECEIVEDINITIATE"; 303 case Session::STATE_SENTPRACCEPT: 304 return "STATE_SENTPRACCEPT"; 305 case Session::STATE_SENTACCEPT: 306 return "STATE_SENTACCEPT"; 307 case Session::STATE_RECEIVEDPRACCEPT: 308 return "STATE_RECEIVEDPRACCEPT"; 309 case Session::STATE_RECEIVEDACCEPT: 310 return "STATE_RECEIVEDACCEPT"; 311 case Session::STATE_SENTMODIFY: 312 return "STATE_SENTMODIFY"; 313 case Session::STATE_RECEIVEDMODIFY: 314 return "STATE_RECEIVEDMODIFY"; 315 case Session::STATE_SENTREJECT: 316 return "STATE_SENTREJECT"; 317 case Session::STATE_RECEIVEDREJECT: 318 return "STATE_RECEIVEDREJECT"; 319 case Session::STATE_SENTREDIRECT: 320 return "STATE_SENTREDIRECT"; 321 case Session::STATE_SENTTERMINATE: 322 return "STATE_SENTTERMINATE"; 323 case Session::STATE_RECEIVEDTERMINATE: 324 return "STATE_RECEIVEDTERMINATE"; 325 case Session::STATE_INPROGRESS: 326 return "STATE_INPROGRESS"; 327 case Session::STATE_DEINIT: 328 return "STATE_DEINIT"; 329 default: 330 break; 331 } 332 return "STATE_" + talk_base::ToString(state); 333 } 334 335 BaseSession::BaseSession(talk_base::Thread* signaling_thread, 336 talk_base::Thread* worker_thread, 337 PortAllocator* port_allocator, 338 const std::string& sid, 339 const std::string& content_type, 340 bool initiator) 341 : state_(STATE_INIT), 342 error_(ERROR_NONE), 343 signaling_thread_(signaling_thread), 344 worker_thread_(worker_thread), 345 port_allocator_(port_allocator), 346 sid_(sid), 347 content_type_(content_type), 348 transport_type_(NS_GINGLE_P2P), 349 initiator_(initiator), 350 identity_(NULL), 351 local_description_(NULL), 352 remote_description_(NULL), 353 ice_tiebreaker_(talk_base::CreateRandomId64()), 354 role_switch_(false) { 355 ASSERT(signaling_thread->IsCurrent()); 356 } 357 358 BaseSession::~BaseSession() { 359 ASSERT(signaling_thread()->IsCurrent()); 360 361 ASSERT(state_ != STATE_DEINIT); 362 LogState(state_, STATE_DEINIT); 363 state_ = STATE_DEINIT; 364 SignalState(this, state_); 365 366 for (TransportMap::iterator iter = transports_.begin(); 367 iter != transports_.end(); ++iter) { 368 delete iter->second; 369 } 370 371 delete remote_description_; 372 delete local_description_; 373 } 374 375 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) { 376 if (identity_) 377 return false; 378 identity_ = identity; 379 for (TransportMap::iterator iter = transports_.begin(); 380 iter != transports_.end(); ++iter) { 381 iter->second->SetIdentity(identity_); 382 } 383 return true; 384 } 385 386 bool BaseSession::PushdownTransportDescription(ContentSource source, 387 ContentAction action) { 388 if (source == CS_LOCAL) { 389 return PushdownLocalTransportDescription(local_description_, action); 390 } 391 return PushdownRemoteTransportDescription(remote_description_, action); 392 } 393 394 bool BaseSession::PushdownLocalTransportDescription( 395 const SessionDescription* sdesc, 396 ContentAction action) { 397 // Update the Transports with the right information, and trigger them to 398 // start connecting. 399 for (TransportMap::iterator iter = transports_.begin(); 400 iter != transports_.end(); ++iter) { 401 // If no transport info was in this session description, ret == false 402 // and we just skip this one. 403 TransportDescription tdesc; 404 bool ret = GetTransportDescription( 405 sdesc, iter->second->content_name(), &tdesc); 406 if (ret) { 407 if (!iter->second->SetLocalTransportDescription(tdesc, action)) { 408 return false; 409 } 410 411 iter->second->ConnectChannels(); 412 } 413 } 414 415 return true; 416 } 417 418 bool BaseSession::PushdownRemoteTransportDescription( 419 const SessionDescription* sdesc, 420 ContentAction action) { 421 // Update the Transports with the right information. 422 for (TransportMap::iterator iter = transports_.begin(); 423 iter != transports_.end(); ++iter) { 424 TransportDescription tdesc; 425 426 // If no transport info was in this session description, ret == false 427 // and we just skip this one. 428 bool ret = GetTransportDescription( 429 sdesc, iter->second->content_name(), &tdesc); 430 if (ret) { 431 if (!iter->second->SetRemoteTransportDescription(tdesc, action)) { 432 return false; 433 } 434 } 435 } 436 437 return true; 438 } 439 440 TransportChannel* BaseSession::CreateChannel(const std::string& content_name, 441 const std::string& channel_name, 442 int component) { 443 // We create the proxy "on demand" here because we need to support 444 // creating channels at any time, even before we send or receive 445 // initiate messages, which is before we create the transports. 446 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name); 447 return transproxy->CreateChannel(channel_name, component); 448 } 449 450 TransportChannel* BaseSession::GetChannel(const std::string& content_name, 451 int component) { 452 TransportProxy* transproxy = GetTransportProxy(content_name); 453 if (transproxy == NULL) 454 return NULL; 455 else 456 return transproxy->GetChannel(component); 457 } 458 459 void BaseSession::DestroyChannel(const std::string& content_name, 460 int component) { 461 TransportProxy* transproxy = GetTransportProxy(content_name); 462 ASSERT(transproxy != NULL); 463 transproxy->DestroyChannel(component); 464 } 465 466 TransportProxy* BaseSession::GetOrCreateTransportProxy( 467 const std::string& content_name) { 468 TransportProxy* transproxy = GetTransportProxy(content_name); 469 if (transproxy) 470 return transproxy; 471 472 Transport* transport = CreateTransport(content_name); 473 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED); 474 transport->SetIceTiebreaker(ice_tiebreaker_); 475 // TODO: Connect all the Transport signals to TransportProxy 476 // then to the BaseSession. 477 transport->SignalConnecting.connect( 478 this, &BaseSession::OnTransportConnecting); 479 transport->SignalWritableState.connect( 480 this, &BaseSession::OnTransportWritable); 481 transport->SignalRequestSignaling.connect( 482 this, &BaseSession::OnTransportRequestSignaling); 483 transport->SignalTransportError.connect( 484 this, &BaseSession::OnTransportSendError); 485 transport->SignalRouteChange.connect( 486 this, &BaseSession::OnTransportRouteChange); 487 transport->SignalCandidatesAllocationDone.connect( 488 this, &BaseSession::OnTransportCandidatesAllocationDone); 489 transport->SignalRoleConflict.connect( 490 this, &BaseSession::OnRoleConflict); 491 492 transproxy = new TransportProxy(sid_, content_name, 493 new TransportWrapper(transport)); 494 transproxy->SignalCandidatesReady.connect( 495 this, &BaseSession::OnTransportProxyCandidatesReady); 496 transports_[content_name] = transproxy; 497 498 return transproxy; 499 } 500 501 Transport* BaseSession::GetTransport(const std::string& content_name) { 502 TransportProxy* transproxy = GetTransportProxy(content_name); 503 if (transproxy == NULL) 504 return NULL; 505 return transproxy->impl(); 506 } 507 508 TransportProxy* BaseSession::GetTransportProxy( 509 const std::string& content_name) { 510 TransportMap::iterator iter = transports_.find(content_name); 511 return (iter != transports_.end()) ? iter->second : NULL; 512 } 513 514 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) { 515 for (TransportMap::iterator iter = transports_.begin(); 516 iter != transports_.end(); ++iter) { 517 TransportProxy* transproxy = iter->second; 518 if (transproxy->impl() == transport) { 519 return transproxy; 520 } 521 } 522 return NULL; 523 } 524 525 TransportProxy* BaseSession::GetFirstTransportProxy() { 526 if (transports_.empty()) 527 return NULL; 528 return transports_.begin()->second; 529 } 530 531 void BaseSession::DestroyTransportProxy( 532 const std::string& content_name) { 533 TransportMap::iterator iter = transports_.find(content_name); 534 if (iter != transports_.end()) { 535 delete iter->second; 536 transports_.erase(content_name); 537 } 538 } 539 540 cricket::Transport* BaseSession::CreateTransport( 541 const std::string& content_name) { 542 ASSERT(transport_type_ == NS_GINGLE_P2P); 543 return new cricket::DtlsTransport<P2PTransport>( 544 signaling_thread(), worker_thread(), content_name, 545 port_allocator(), identity_); 546 } 547 548 bool BaseSession::GetStats(SessionStats* stats) { 549 for (TransportMap::iterator iter = transports_.begin(); 550 iter != transports_.end(); ++iter) { 551 std::string proxy_id = iter->second->content_name(); 552 // We are ignoring not-yet-instantiated transports. 553 if (iter->second->impl()) { 554 std::string transport_id = iter->second->impl()->content_name(); 555 stats->proxy_to_transport[proxy_id] = transport_id; 556 if (stats->transport_stats.find(transport_id) 557 == stats->transport_stats.end()) { 558 TransportStats subinfos; 559 if (!iter->second->impl()->GetStats(&subinfos)) { 560 return false; 561 } 562 stats->transport_stats[transport_id] = subinfos; 563 } 564 } 565 } 566 return true; 567 } 568 569 void BaseSession::SetState(State state) { 570 ASSERT(signaling_thread_->IsCurrent()); 571 if (state != state_) { 572 LogState(state_, state); 573 state_ = state; 574 SignalState(this, state_); 575 signaling_thread_->Post(this, MSG_STATE); 576 } 577 SignalNewDescription(); 578 } 579 580 void BaseSession::SetError(Error error) { 581 ASSERT(signaling_thread_->IsCurrent()); 582 if (error != error_) { 583 error_ = error; 584 SignalError(this, error); 585 } 586 } 587 588 void BaseSession::OnSignalingReady() { 589 ASSERT(signaling_thread()->IsCurrent()); 590 for (TransportMap::iterator iter = transports_.begin(); 591 iter != transports_.end(); ++iter) { 592 iter->second->OnSignalingReady(); 593 } 594 } 595 596 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to 597 // start, remove this method once everyone calls PushdownLocalTD. 598 void BaseSession::SpeculativelyConnectAllTransportChannels() { 599 // Put all transports into the connecting state. 600 for (TransportMap::iterator iter = transports_.begin(); 601 iter != transports_.end(); ++iter) { 602 iter->second->ConnectChannels(); 603 } 604 } 605 606 bool BaseSession::OnRemoteCandidates(const std::string& content_name, 607 const Candidates& candidates, 608 std::string* error) { 609 // Give candidates to the appropriate transport, and tell that transport 610 // to start connecting, if it's not already doing so. 611 TransportProxy* transproxy = GetTransportProxy(content_name); 612 if (!transproxy) { 613 *error = "Unknown content name " + content_name; 614 return false; 615 } 616 if (!transproxy->OnRemoteCandidates(candidates, error)) { 617 return false; 618 } 619 // TODO(juberti): Remove this call once we can be sure that we always have 620 // a local transport description (which will trigger the connection). 621 transproxy->ConnectChannels(); 622 return true; 623 } 624 625 bool BaseSession::MaybeEnableMuxingSupport() { 626 // We need both a local and remote description to decide if we should mux. 627 if ((state_ == STATE_SENTINITIATE || 628 state_ == STATE_RECEIVEDINITIATE) && 629 ((local_description_ == NULL) || 630 (remote_description_ == NULL))) { 631 return false; 632 } 633 634 // In order to perform the multiplexing, we need all proxies to be in the 635 // negotiated state, i.e. to have implementations underneath. 636 // Ensure that this is the case, regardless of whether we are going to mux. 637 for (TransportMap::iterator iter = transports_.begin(); 638 iter != transports_.end(); ++iter) { 639 ASSERT(iter->second->negotiated()); 640 if (!iter->second->negotiated()) 641 return false; 642 } 643 644 // If both sides agree to BUNDLE, mux all the specified contents onto the 645 // transport belonging to the first content name in the BUNDLE group. 646 // If the contents are already muxed, this will be a no-op. 647 // TODO(juberti): Should this check that local and remote have configured 648 // BUNDLE the same way? 649 bool candidates_allocated = IsCandidateAllocationDone(); 650 const ContentGroup* local_bundle_group = 651 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 652 const ContentGroup* remote_bundle_group = 653 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE); 654 if (local_bundle_group && remote_bundle_group && 655 local_bundle_group->FirstContentName()) { 656 const std::string* content_name = local_bundle_group->FirstContentName(); 657 const ContentInfo* content = 658 local_description_->GetContentByName(*content_name); 659 ASSERT(content != NULL); 660 if (!SetSelectedProxy(content->name, local_bundle_group)) { 661 LOG(LS_WARNING) << "Failed to set up BUNDLE"; 662 return false; 663 } 664 665 // If we weren't done gathering before, we might be done now, as a result 666 // of enabling mux. 667 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: " 668 << *content_name; 669 if (!candidates_allocated) { 670 MaybeCandidateAllocationDone(); 671 } 672 } else { 673 LOG(LS_INFO) << "No BUNDLE information, not bundling."; 674 } 675 return true; 676 } 677 678 bool BaseSession::SetSelectedProxy(const std::string& content_name, 679 const ContentGroup* muxed_group) { 680 TransportProxy* selected_proxy = GetTransportProxy(content_name); 681 if (!selected_proxy) { 682 return false; 683 } 684 685 ASSERT(selected_proxy->negotiated()); 686 for (TransportMap::iterator iter = transports_.begin(); 687 iter != transports_.end(); ++iter) { 688 // If content is part of the mux group, then repoint its proxy at the 689 // transport object that we have chosen to mux onto. If the proxy 690 // is already pointing at the right object, it will be a no-op. 691 if (muxed_group->HasContentName(iter->first) && 692 !iter->second->SetupMux(selected_proxy)) { 693 return false; 694 } 695 } 696 return true; 697 } 698 699 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) { 700 // TODO(juberti): This is a clunky way of processing the done signal. Instead, 701 // TransportProxy should receive the done signal directly, set its allocated 702 // flag internally, and then reissue the done signal to Session. 703 // Overall we should make TransportProxy receive *all* the signals from 704 // Transport, since this removes the need to manually iterate over all 705 // the transports, as is needed to make sure signals are handled properly 706 // when BUNDLEing. 707 #if 0 708 ASSERT(!IsCandidateAllocationDone()); 709 #endif 710 for (TransportMap::iterator iter = transports_.begin(); 711 iter != transports_.end(); ++iter) { 712 if (iter->second->impl() == transport) { 713 iter->second->set_candidates_allocated(true); 714 } 715 } 716 MaybeCandidateAllocationDone(); 717 } 718 719 bool BaseSession::IsCandidateAllocationDone() const { 720 for (TransportMap::const_iterator iter = transports_.begin(); 721 iter != transports_.end(); ++iter) { 722 if (!iter->second->candidates_allocated()) 723 return false; 724 } 725 return true; 726 } 727 728 void BaseSession::MaybeCandidateAllocationDone() { 729 if (IsCandidateAllocationDone()) { 730 LOG(LS_INFO) << "Candidate gathering is complete."; 731 OnCandidatesAllocationDone(); 732 } 733 } 734 735 void BaseSession::OnRoleConflict() { 736 if (role_switch_) { 737 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; 738 return; 739 } 740 741 role_switch_ = true; 742 for (TransportMap::iterator iter = transports_.begin(); 743 iter != transports_.end(); ++iter) { 744 // Role will be reverse of initial role setting. 745 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING; 746 iter->second->SetIceRole(role); 747 } 748 } 749 750 void BaseSession::LogState(State old_state, State new_state) { 751 LOG(LS_INFO) << "Session:" << id() 752 << " Old state:" << StateToString(old_state) 753 << " New state:" << StateToString(new_state) 754 << " Type:" << content_type() 755 << " Transport:" << transport_type(); 756 } 757 758 bool BaseSession::GetTransportDescription(const SessionDescription* description, 759 const std::string& content_name, 760 TransportDescription* tdesc) { 761 if (!description || !tdesc) { 762 return false; 763 } 764 const TransportInfo* transport_info = 765 description->GetTransportInfoByName(content_name); 766 if (!transport_info) { 767 return false; 768 } 769 *tdesc = transport_info->description; 770 return true; 771 } 772 773 void BaseSession::SignalNewDescription() { 774 ContentAction action; 775 ContentSource source; 776 if (!GetContentAction(&action, &source)) { 777 return; 778 } 779 if (source == CS_LOCAL) { 780 SignalNewLocalDescription(this, action); 781 } else { 782 SignalNewRemoteDescription(this, action); 783 } 784 } 785 786 bool BaseSession::GetContentAction(ContentAction* action, 787 ContentSource* source) { 788 switch (state_) { 789 // new local description 790 case STATE_SENTINITIATE: 791 *action = CA_OFFER; 792 *source = CS_LOCAL; 793 break; 794 case STATE_SENTPRACCEPT: 795 *action = CA_PRANSWER; 796 *source = CS_LOCAL; 797 break; 798 case STATE_SENTACCEPT: 799 *action = CA_ANSWER; 800 *source = CS_LOCAL; 801 break; 802 // new remote description 803 case STATE_RECEIVEDINITIATE: 804 *action = CA_OFFER; 805 *source = CS_REMOTE; 806 break; 807 case STATE_RECEIVEDPRACCEPT: 808 *action = CA_PRANSWER; 809 *source = CS_REMOTE; 810 break; 811 case STATE_RECEIVEDACCEPT: 812 *action = CA_ANSWER; 813 *source = CS_REMOTE; 814 break; 815 default: 816 return false; 817 } 818 return true; 819 } 820 821 void BaseSession::OnMessage(talk_base::Message *pmsg) { 822 switch (pmsg->message_id) { 823 case MSG_TIMEOUT: 824 // Session timeout has occured. 825 SetError(ERROR_TIME); 826 break; 827 828 case MSG_STATE: 829 switch (state_) { 830 case STATE_SENTACCEPT: 831 case STATE_RECEIVEDACCEPT: 832 SetState(STATE_INPROGRESS); 833 break; 834 835 default: 836 // Explicitly ignoring some states here. 837 break; 838 } 839 break; 840 } 841 } 842 843 Session::Session(SessionManager* session_manager, 844 const std::string& local_name, 845 const std::string& initiator_name, 846 const std::string& sid, 847 const std::string& content_type, 848 SessionClient* client) 849 : BaseSession(session_manager->signaling_thread(), 850 session_manager->worker_thread(), 851 session_manager->port_allocator(), 852 sid, content_type, initiator_name == local_name) { 853 ASSERT(client != NULL); 854 session_manager_ = session_manager; 855 local_name_ = local_name; 856 initiator_name_ = initiator_name; 857 transport_parser_ = new P2PTransportParser(); 858 client_ = client; 859 initiate_acked_ = false; 860 current_protocol_ = PROTOCOL_HYBRID; 861 } 862 863 Session::~Session() { 864 delete transport_parser_; 865 } 866 867 bool Session::Initiate(const std::string &to, 868 const SessionDescription* sdesc) { 869 ASSERT(signaling_thread()->IsCurrent()); 870 SessionError error; 871 872 // Only from STATE_INIT 873 if (state() != STATE_INIT) 874 return false; 875 876 // Setup for signaling. 877 set_remote_name(to); 878 set_local_description(sdesc); 879 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()), 880 &error)) { 881 LOG(LS_ERROR) << "Could not create transports: " << error.text; 882 return false; 883 } 884 885 if (!SendInitiateMessage(sdesc, &error)) { 886 LOG(LS_ERROR) << "Could not send initiate message: " << error.text; 887 return false; 888 } 889 890 // We need to connect transport proxy and impl here so that we can process 891 // the TransportDescriptions. 892 SpeculativelyConnectAllTransportChannels(); 893 894 PushdownTransportDescription(CS_LOCAL, CA_OFFER); 895 SetState(Session::STATE_SENTINITIATE); 896 return true; 897 } 898 899 bool Session::Accept(const SessionDescription* sdesc) { 900 ASSERT(signaling_thread()->IsCurrent()); 901 902 // Only if just received initiate 903 if (state() != STATE_RECEIVEDINITIATE) 904 return false; 905 906 // Setup for signaling. 907 set_local_description(sdesc); 908 909 SessionError error; 910 if (!SendAcceptMessage(sdesc, &error)) { 911 LOG(LS_ERROR) << "Could not send accept message: " << error.text; 912 return false; 913 } 914 // TODO(juberti): Add BUNDLE support to transport-info messages. 915 PushdownTransportDescription(CS_LOCAL, CA_ANSWER); 916 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 917 SetState(Session::STATE_SENTACCEPT); 918 return true; 919 } 920 921 bool Session::Reject(const std::string& reason) { 922 ASSERT(signaling_thread()->IsCurrent()); 923 924 // Reject is sent in response to an initiate or modify, to reject the 925 // request 926 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY) 927 return false; 928 929 SessionError error; 930 if (!SendRejectMessage(reason, &error)) { 931 LOG(LS_ERROR) << "Could not send reject message: " << error.text; 932 return false; 933 } 934 935 SetState(STATE_SENTREJECT); 936 return true; 937 } 938 939 bool Session::TerminateWithReason(const std::string& reason) { 940 ASSERT(signaling_thread()->IsCurrent()); 941 942 // Either side can terminate, at any time. 943 switch (state()) { 944 case STATE_SENTTERMINATE: 945 case STATE_RECEIVEDTERMINATE: 946 return false; 947 948 case STATE_SENTREJECT: 949 case STATE_RECEIVEDREJECT: 950 // We don't need to send terminate if we sent or received a reject... 951 // it's implicit. 952 break; 953 954 default: 955 SessionError error; 956 if (!SendTerminateMessage(reason, &error)) { 957 LOG(LS_ERROR) << "Could not send terminate message: " << error.text; 958 return false; 959 } 960 break; 961 } 962 963 SetState(STATE_SENTTERMINATE); 964 return true; 965 } 966 967 bool Session::SendInfoMessage(const XmlElements& elems) { 968 ASSERT(signaling_thread()->IsCurrent()); 969 SessionError error; 970 if (!SendMessage(ACTION_SESSION_INFO, elems, &error)) { 971 LOG(LS_ERROR) << "Could not send info message " << error.text; 972 return false; 973 } 974 return true; 975 } 976 977 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) { 978 XmlElements elems; 979 WriteError write_error; 980 if (!WriteDescriptionInfo(current_protocol_, 981 contents, 982 GetContentParsers(), 983 &elems, &write_error)) { 984 LOG(LS_ERROR) << "Could not write description info message: " 985 << write_error.text; 986 return false; 987 } 988 SessionError error; 989 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) { 990 LOG(LS_ERROR) << "Could not send description info message: " 991 << error.text; 992 return false; 993 } 994 return true; 995 } 996 997 TransportInfos Session::GetEmptyTransportInfos( 998 const ContentInfos& contents) const { 999 TransportInfos tinfos; 1000 for (ContentInfos::const_iterator content = contents.begin(); 1001 content != contents.end(); ++content) { 1002 tinfos.push_back( 1003 TransportInfo(content->name, 1004 TransportDescription(transport_type(), Candidates()))); 1005 } 1006 return tinfos; 1007 } 1008 1009 bool Session::OnRemoteCandidates( 1010 const TransportInfos& tinfos, ParseError* error) { 1011 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1012 tinfo != tinfos.end(); ++tinfo) { 1013 std::string str_error; 1014 if (!BaseSession::OnRemoteCandidates( 1015 tinfo->content_name, tinfo->description.candidates, &str_error)) { 1016 return BadParse(str_error, error); 1017 } 1018 } 1019 return true; 1020 } 1021 1022 bool Session::CreateTransportProxies(const TransportInfos& tinfos, 1023 SessionError* error) { 1024 for (TransportInfos::const_iterator tinfo = tinfos.begin(); 1025 tinfo != tinfos.end(); ++tinfo) { 1026 if (tinfo->description.transport_type != transport_type()) { 1027 error->SetText("No supported transport in offer."); 1028 return false; 1029 } 1030 1031 GetOrCreateTransportProxy(tinfo->content_name); 1032 } 1033 return true; 1034 } 1035 1036 TransportParserMap Session::GetTransportParsers() { 1037 TransportParserMap parsers; 1038 parsers[transport_type()] = transport_parser_; 1039 return parsers; 1040 } 1041 1042 CandidateTranslatorMap Session::GetCandidateTranslators() { 1043 CandidateTranslatorMap translators; 1044 // NOTE: This technique makes it impossible to parse G-ICE 1045 // candidates in session-initiate messages because the channels 1046 // aren't yet created at that point. Since we don't use candidates 1047 // in session-initiate messages, we should be OK. Once we switch to 1048 // ICE, this translation shouldn't be necessary. 1049 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1050 iter != transport_proxies().end(); ++iter) { 1051 translators[iter->first] = iter->second; 1052 } 1053 return translators; 1054 } 1055 1056 ContentParserMap Session::GetContentParsers() { 1057 ContentParserMap parsers; 1058 parsers[content_type()] = client_; 1059 // We need to be able parse both RTP-based and SCTP-based Jingle 1060 // with the same client. 1061 if (content_type() == NS_JINGLE_RTP) { 1062 parsers[NS_JINGLE_DRAFT_SCTP] = client_; 1063 } 1064 return parsers; 1065 } 1066 1067 void Session::OnTransportRequestSignaling(Transport* transport) { 1068 ASSERT(signaling_thread()->IsCurrent()); 1069 TransportProxy* transproxy = GetTransportProxy(transport); 1070 ASSERT(transproxy != NULL); 1071 if (transproxy) { 1072 // Reset candidate allocation status for the transport proxy. 1073 transproxy->set_candidates_allocated(false); 1074 } 1075 SignalRequestSignaling(this); 1076 } 1077 1078 void Session::OnTransportConnecting(Transport* transport) { 1079 // This is an indication that we should begin watching the writability 1080 // state of the transport. 1081 OnTransportWritable(transport); 1082 } 1083 1084 void Session::OnTransportWritable(Transport* transport) { 1085 ASSERT(signaling_thread()->IsCurrent()); 1086 1087 // If the transport is not writable, start a timer to make sure that it 1088 // becomes writable within a reasonable amount of time. If it does not, we 1089 // terminate since we can't actually send data. If the transport is writable, 1090 // cancel the timer. Note that writability transitions may occur repeatedly 1091 // during the lifetime of the session. 1092 signaling_thread()->Clear(this, MSG_TIMEOUT); 1093 if (transport->HasChannels() && !transport->writable()) { 1094 signaling_thread()->PostDelayed( 1095 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); 1096 } 1097 } 1098 1099 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy, 1100 const Candidates& candidates) { 1101 ASSERT(signaling_thread()->IsCurrent()); 1102 if (transproxy != NULL) { 1103 if (initiator() && !initiate_acked_) { 1104 // TODO: This is to work around server re-ordering 1105 // messages. We send the candidates once the session-initiate 1106 // is acked. Once we have fixed the server to guarantee message 1107 // order, we can remove this case. 1108 transproxy->AddUnsentCandidates(candidates); 1109 } else { 1110 if (!transproxy->negotiated()) { 1111 transproxy->AddSentCandidates(candidates); 1112 } 1113 SessionError error; 1114 if (!SendTransportInfoMessage(transproxy, candidates, &error)) { 1115 LOG(LS_ERROR) << "Could not send transport info message: " 1116 << error.text; 1117 return; 1118 } 1119 } 1120 } 1121 } 1122 1123 void Session::OnTransportSendError(Transport* transport, 1124 const buzz::XmlElement* stanza, 1125 const buzz::QName& name, 1126 const std::string& type, 1127 const std::string& text, 1128 const buzz::XmlElement* extra_info) { 1129 ASSERT(signaling_thread()->IsCurrent()); 1130 SignalErrorMessage(this, stanza, name, type, text, extra_info); 1131 } 1132 1133 void Session::OnIncomingMessage(const SessionMessage& msg) { 1134 ASSERT(signaling_thread()->IsCurrent()); 1135 ASSERT(state() == STATE_INIT || msg.from == remote_name()); 1136 1137 if (current_protocol_== PROTOCOL_HYBRID) { 1138 if (msg.protocol == PROTOCOL_GINGLE) { 1139 current_protocol_ = PROTOCOL_GINGLE; 1140 } else { 1141 current_protocol_ = PROTOCOL_JINGLE; 1142 } 1143 } 1144 1145 bool valid = false; 1146 MessageError error; 1147 switch (msg.type) { 1148 case ACTION_SESSION_INITIATE: 1149 valid = OnInitiateMessage(msg, &error); 1150 break; 1151 case ACTION_SESSION_INFO: 1152 valid = OnInfoMessage(msg); 1153 break; 1154 case ACTION_SESSION_ACCEPT: 1155 valid = OnAcceptMessage(msg, &error); 1156 break; 1157 case ACTION_SESSION_REJECT: 1158 valid = OnRejectMessage(msg, &error); 1159 break; 1160 case ACTION_SESSION_TERMINATE: 1161 valid = OnTerminateMessage(msg, &error); 1162 break; 1163 case ACTION_TRANSPORT_INFO: 1164 valid = OnTransportInfoMessage(msg, &error); 1165 break; 1166 case ACTION_TRANSPORT_ACCEPT: 1167 valid = OnTransportAcceptMessage(msg, &error); 1168 break; 1169 case ACTION_DESCRIPTION_INFO: 1170 valid = OnDescriptionInfoMessage(msg, &error); 1171 break; 1172 default: 1173 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST, 1174 "unknown session message type", 1175 &error); 1176 } 1177 1178 if (valid) { 1179 SendAcknowledgementMessage(msg.stanza); 1180 } else { 1181 SignalErrorMessage(this, msg.stanza, error.type, 1182 "modify", error.text, NULL); 1183 } 1184 } 1185 1186 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza, 1187 const buzz::XmlElement* response_stanza, 1188 const SessionMessage& msg) { 1189 ASSERT(signaling_thread()->IsCurrent()); 1190 1191 if (msg.type == ACTION_SESSION_INITIATE) { 1192 OnInitiateAcked(); 1193 } 1194 } 1195 1196 void Session::OnInitiateAcked() { 1197 // TODO: This is to work around server re-ordering 1198 // messages. We send the candidates once the session-initiate 1199 // is acked. Once we have fixed the server to guarantee message 1200 // order, we can remove this case. 1201 if (!initiate_acked_) { 1202 initiate_acked_ = true; 1203 SessionError error; 1204 SendAllUnsentTransportInfoMessages(&error); 1205 } 1206 } 1207 1208 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza, 1209 const buzz::XmlElement* error_stanza) { 1210 ASSERT(signaling_thread()->IsCurrent()); 1211 1212 SessionMessage msg; 1213 ParseError parse_error; 1214 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) { 1215 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text 1216 << ":" << orig_stanza; 1217 return; 1218 } 1219 1220 // If the error is a session redirect, call OnRedirectError, which will 1221 // continue the session with a new remote JID. 1222 SessionRedirect redirect; 1223 if (FindSessionRedirect(error_stanza, &redirect)) { 1224 SessionError error; 1225 if (!OnRedirectError(redirect, &error)) { 1226 // TODO: Should we send a message back? The standard 1227 // says nothing about it. 1228 LOG(LS_ERROR) << "Failed to redirect: " << error.text; 1229 SetError(ERROR_RESPONSE); 1230 } 1231 return; 1232 } 1233 1234 std::string error_type = "cancel"; 1235 1236 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR); 1237 if (error) { 1238 error_type = error->Attr(buzz::QN_TYPE); 1239 1240 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n" 1241 << "in response to:\n" << orig_stanza->Str(); 1242 } else { 1243 // don't crash if <error> is missing 1244 LOG(LS_ERROR) << "Session error without <error/> element, ignoring"; 1245 return; 1246 } 1247 1248 if (msg.type == ACTION_TRANSPORT_INFO) { 1249 // Transport messages frequently generate errors because they are sent right 1250 // when we detect a network failure. For that reason, we ignore such 1251 // errors, because if we do not establish writability again, we will 1252 // terminate anyway. The exceptions are transport-specific error tags, 1253 // which we pass on to the respective transport. 1254 } else if ((error_type != "continue") && (error_type != "wait")) { 1255 // We do not set an error if the other side said it is okay to continue 1256 // (possibly after waiting). These errors can be ignored. 1257 SetError(ERROR_RESPONSE); 1258 } 1259 } 1260 1261 bool Session::OnInitiateMessage(const SessionMessage& msg, 1262 MessageError* error) { 1263 if (!CheckState(STATE_INIT, error)) 1264 return false; 1265 1266 SessionInitiate init; 1267 if (!ParseSessionInitiate(msg.protocol, msg.action_elem, 1268 GetContentParsers(), GetTransportParsers(), 1269 GetCandidateTranslators(), 1270 &init, error)) 1271 return false; 1272 1273 SessionError session_error; 1274 if (!CreateTransportProxies(init.transports, &session_error)) { 1275 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE, 1276 session_error.text, error); 1277 } 1278 1279 set_remote_name(msg.from); 1280 set_initiator_name(msg.initiator); 1281 set_remote_description(new SessionDescription(init.ClearContents(), 1282 init.transports, 1283 init.groups)); 1284 // Updating transport with TransportDescription. 1285 PushdownTransportDescription(CS_REMOTE, CA_OFFER); 1286 SetState(STATE_RECEIVEDINITIATE); 1287 1288 // Users of Session may listen to state change and call Reject(). 1289 if (state() != STATE_SENTREJECT) { 1290 if (!OnRemoteCandidates(init.transports, error)) 1291 return false; 1292 1293 // TODO(juberti): Auto-generate and push down the local transport answer. 1294 // This is necessary for trickling to work with RFC 5245 ICE. 1295 } 1296 return true; 1297 } 1298 1299 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) { 1300 if (!CheckState(STATE_SENTINITIATE, error)) 1301 return false; 1302 1303 SessionAccept accept; 1304 if (!ParseSessionAccept(msg.protocol, msg.action_elem, 1305 GetContentParsers(), GetTransportParsers(), 1306 GetCandidateTranslators(), 1307 &accept, error)) { 1308 return false; 1309 } 1310 1311 // If we get an accept, we can assume the initiate has been 1312 // received, even if we haven't gotten an IQ response. 1313 OnInitiateAcked(); 1314 1315 set_remote_description(new SessionDescription(accept.ClearContents(), 1316 accept.transports, 1317 accept.groups)); 1318 // Updating transport with TransportDescription. 1319 PushdownTransportDescription(CS_REMOTE, CA_ANSWER); 1320 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported. 1321 SetState(STATE_RECEIVEDACCEPT); 1322 1323 if (!OnRemoteCandidates(accept.transports, error)) 1324 return false; 1325 1326 return true; 1327 } 1328 1329 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) { 1330 if (!CheckState(STATE_SENTINITIATE, error)) 1331 return false; 1332 1333 SetState(STATE_RECEIVEDREJECT); 1334 return true; 1335 } 1336 1337 bool Session::OnInfoMessage(const SessionMessage& msg) { 1338 SignalInfoMessage(this, msg.action_elem); 1339 return true; 1340 } 1341 1342 bool Session::OnTerminateMessage(const SessionMessage& msg, 1343 MessageError* error) { 1344 SessionTerminate term; 1345 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error)) 1346 return false; 1347 1348 SignalReceivedTerminateReason(this, term.reason); 1349 if (term.debug_reason != buzz::STR_EMPTY) { 1350 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason; 1351 } 1352 1353 SetState(STATE_RECEIVEDTERMINATE); 1354 return true; 1355 } 1356 1357 bool Session::OnTransportInfoMessage(const SessionMessage& msg, 1358 MessageError* error) { 1359 TransportInfos tinfos; 1360 if (!ParseTransportInfos(msg.protocol, msg.action_elem, 1361 initiator_description()->contents(), 1362 GetTransportParsers(), GetCandidateTranslators(), 1363 &tinfos, error)) 1364 return false; 1365 1366 if (!OnRemoteCandidates(tinfos, error)) 1367 return false; 1368 1369 return true; 1370 } 1371 1372 bool Session::OnTransportAcceptMessage(const SessionMessage& msg, 1373 MessageError* error) { 1374 // TODO: Currently here only for compatibility with 1375 // Gingle 1.1 clients (notably, Google Voice). 1376 return true; 1377 } 1378 1379 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg, 1380 MessageError* error) { 1381 if (!CheckState(STATE_INPROGRESS, error)) 1382 return false; 1383 1384 DescriptionInfo description_info; 1385 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem, 1386 GetContentParsers(), GetTransportParsers(), 1387 GetCandidateTranslators(), 1388 &description_info, error)) { 1389 return false; 1390 } 1391 1392 ContentInfos& updated_contents = description_info.contents; 1393 1394 // TODO: Currently, reflector sends back 1395 // video stream updates even for an audio-only call, which causes 1396 // this to fail. Put this back once reflector is fixed. 1397 // 1398 // ContentInfos::iterator it; 1399 // First, ensure all updates are valid before modifying remote_description_. 1400 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1401 // if (remote_description()->GetContentByName(it->name) == NULL) { 1402 // return false; 1403 // } 1404 // } 1405 1406 // TODO: We used to replace contents from an update, but 1407 // that no longer works with partial updates. We need to figure out 1408 // a way to merge patial updates into contents. For now, users of 1409 // Session should listen to SignalRemoteDescriptionUpdate and handle 1410 // updates. They should not expect remote_description to be the 1411 // latest value. 1412 // 1413 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) { 1414 // remote_description()->RemoveContentByName(it->name); 1415 // remote_description()->AddContent(it->name, it->type, it->description); 1416 // } 1417 // } 1418 1419 SignalRemoteDescriptionUpdate(this, updated_contents); 1420 return true; 1421 } 1422 1423 bool BareJidsEqual(const std::string& name1, 1424 const std::string& name2) { 1425 buzz::Jid jid1(name1); 1426 buzz::Jid jid2(name2); 1427 1428 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2); 1429 } 1430 1431 bool Session::OnRedirectError(const SessionRedirect& redirect, 1432 SessionError* error) { 1433 MessageError message_error; 1434 if (!CheckState(STATE_SENTINITIATE, &message_error)) { 1435 return BadWrite(message_error.text, error); 1436 } 1437 1438 if (!BareJidsEqual(remote_name(), redirect.target)) 1439 return BadWrite("Redirection not allowed: must be the same bare jid.", 1440 error); 1441 1442 // When we receive a redirect, we point the session at the new JID 1443 // and resend the candidates. 1444 set_remote_name(redirect.target); 1445 return (SendInitiateMessage(local_description(), error) && 1446 ResendAllTransportInfoMessages(error)); 1447 } 1448 1449 bool Session::CheckState(State expected, MessageError* error) { 1450 if (state() != expected) { 1451 // The server can deliver messages out of order/repeated for various 1452 // reasons. For example, if the server does not recive our iq response, 1453 // it could assume that the iq it sent was lost, and will then send 1454 // it again. Ideally, we should implement reliable messaging with 1455 // duplicate elimination. 1456 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED, 1457 "message not allowed in current state", 1458 error); 1459 } 1460 return true; 1461 } 1462 1463 void Session::SetError(Error error) { 1464 BaseSession::SetError(error); 1465 if (error != ERROR_NONE) 1466 signaling_thread()->Post(this, MSG_ERROR); 1467 } 1468 1469 void Session::OnMessage(talk_base::Message* pmsg) { 1470 // preserve this because BaseSession::OnMessage may modify it 1471 State orig_state = state(); 1472 1473 BaseSession::OnMessage(pmsg); 1474 1475 switch (pmsg->message_id) { 1476 case MSG_ERROR: 1477 TerminateWithReason(STR_TERMINATE_ERROR); 1478 break; 1479 1480 case MSG_STATE: 1481 switch (orig_state) { 1482 case STATE_SENTREJECT: 1483 case STATE_RECEIVEDREJECT: 1484 // Assume clean termination. 1485 Terminate(); 1486 break; 1487 1488 case STATE_SENTTERMINATE: 1489 case STATE_RECEIVEDTERMINATE: 1490 session_manager_->DestroySession(this); 1491 break; 1492 1493 default: 1494 // Explicitly ignoring some states here. 1495 break; 1496 } 1497 break; 1498 } 1499 } 1500 1501 bool Session::SendInitiateMessage(const SessionDescription* sdesc, 1502 SessionError* error) { 1503 SessionInitiate init; 1504 init.contents = sdesc->contents(); 1505 init.transports = GetEmptyTransportInfos(init.contents); 1506 init.groups = sdesc->groups(); 1507 return SendMessage(ACTION_SESSION_INITIATE, init, error); 1508 } 1509 1510 bool Session::WriteSessionAction( 1511 SignalingProtocol protocol, const SessionInitiate& init, 1512 XmlElements* elems, WriteError* error) { 1513 return WriteSessionInitiate(protocol, init.contents, init.transports, 1514 GetContentParsers(), GetTransportParsers(), 1515 GetCandidateTranslators(), init.groups, 1516 elems, error); 1517 } 1518 1519 bool Session::SendAcceptMessage(const SessionDescription* sdesc, 1520 SessionError* error) { 1521 XmlElements elems; 1522 if (!WriteSessionAccept(current_protocol_, 1523 sdesc->contents(), 1524 GetEmptyTransportInfos(sdesc->contents()), 1525 GetContentParsers(), GetTransportParsers(), 1526 GetCandidateTranslators(), sdesc->groups(), 1527 &elems, error)) { 1528 return false; 1529 } 1530 return SendMessage(ACTION_SESSION_ACCEPT, elems, error); 1531 } 1532 1533 bool Session::SendRejectMessage(const std::string& reason, 1534 SessionError* error) { 1535 SessionTerminate term(reason); 1536 return SendMessage(ACTION_SESSION_REJECT, term, error); 1537 } 1538 1539 bool Session::SendTerminateMessage(const std::string& reason, 1540 SessionError* error) { 1541 SessionTerminate term(reason); 1542 return SendMessage(ACTION_SESSION_TERMINATE, term, error); 1543 } 1544 1545 bool Session::WriteSessionAction(SignalingProtocol protocol, 1546 const SessionTerminate& term, 1547 XmlElements* elems, WriteError* error) { 1548 WriteSessionTerminate(protocol, term, elems); 1549 return true; 1550 } 1551 1552 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo, 1553 SessionError* error) { 1554 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error); 1555 } 1556 1557 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy, 1558 const Candidates& candidates, 1559 SessionError* error) { 1560 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(), 1561 TransportDescription(transproxy->type(), candidates)), error); 1562 } 1563 1564 bool Session::WriteSessionAction(SignalingProtocol protocol, 1565 const TransportInfo& tinfo, 1566 XmlElements* elems, WriteError* error) { 1567 TransportInfos tinfos; 1568 tinfos.push_back(tinfo); 1569 return WriteTransportInfos(protocol, tinfos, 1570 GetTransportParsers(), GetCandidateTranslators(), 1571 elems, error); 1572 } 1573 1574 bool Session::ResendAllTransportInfoMessages(SessionError* error) { 1575 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1576 iter != transport_proxies().end(); ++iter) { 1577 TransportProxy* transproxy = iter->second; 1578 if (transproxy->sent_candidates().size() > 0) { 1579 if (!SendTransportInfoMessage( 1580 transproxy, transproxy->sent_candidates(), error)) { 1581 LOG(LS_ERROR) << "Could not resend transport info messages: " 1582 << error->text; 1583 return false; 1584 } 1585 transproxy->ClearSentCandidates(); 1586 } 1587 } 1588 return true; 1589 } 1590 1591 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) { 1592 for (TransportMap::const_iterator iter = transport_proxies().begin(); 1593 iter != transport_proxies().end(); ++iter) { 1594 TransportProxy* transproxy = iter->second; 1595 if (transproxy->unsent_candidates().size() > 0) { 1596 if (!SendTransportInfoMessage( 1597 transproxy, transproxy->unsent_candidates(), error)) { 1598 LOG(LS_ERROR) << "Could not send unsent transport info messages: " 1599 << error->text; 1600 return false; 1601 } 1602 transproxy->ClearUnsentCandidates(); 1603 } 1604 } 1605 return true; 1606 } 1607 1608 bool Session::SendMessage(ActionType type, const XmlElements& action_elems, 1609 SessionError* error) { 1610 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1611 new buzz::XmlElement(buzz::QN_IQ)); 1612 1613 SessionMessage msg(current_protocol_, type, id(), initiator_name()); 1614 msg.to = remote_name(); 1615 WriteSessionMessage(msg, action_elems, stanza.get()); 1616 1617 SignalOutgoingMessage(this, stanza.get()); 1618 return true; 1619 } 1620 1621 template <typename Action> 1622 bool Session::SendMessage(ActionType type, const Action& action, 1623 SessionError* error) { 1624 talk_base::scoped_ptr<buzz::XmlElement> stanza( 1625 new buzz::XmlElement(buzz::QN_IQ)); 1626 if (!WriteActionMessage(type, action, stanza.get(), error)) 1627 return false; 1628 1629 SignalOutgoingMessage(this, stanza.get()); 1630 return true; 1631 } 1632 1633 template <typename Action> 1634 bool Session::WriteActionMessage(ActionType type, const Action& action, 1635 buzz::XmlElement* stanza, 1636 WriteError* error) { 1637 if (current_protocol_ == PROTOCOL_HYBRID) { 1638 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error)) 1639 return false; 1640 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error)) 1641 return false; 1642 } else { 1643 if (!WriteActionMessage(current_protocol_, type, action, stanza, error)) 1644 return false; 1645 } 1646 return true; 1647 } 1648 1649 template <typename Action> 1650 bool Session::WriteActionMessage(SignalingProtocol protocol, 1651 ActionType type, const Action& action, 1652 buzz::XmlElement* stanza, WriteError* error) { 1653 XmlElements action_elems; 1654 if (!WriteSessionAction(protocol, action, &action_elems, error)) 1655 return false; 1656 1657 SessionMessage msg(protocol, type, id(), initiator_name()); 1658 msg.to = remote_name(); 1659 1660 WriteSessionMessage(msg, action_elems, stanza); 1661 return true; 1662 } 1663 1664 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) { 1665 talk_base::scoped_ptr<buzz::XmlElement> ack( 1666 new buzz::XmlElement(buzz::QN_IQ)); 1667 ack->SetAttr(buzz::QN_TO, remote_name()); 1668 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID)); 1669 ack->SetAttr(buzz::QN_TYPE, "result"); 1670 1671 SignalOutgoingMessage(this, ack.get()); 1672 } 1673 1674 } // namespace cricket 1675