Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/session.h"
     29 
     30 #include "talk/p2p/base/dtlstransport.h"
     31 #include "talk/p2p/base/p2ptransport.h"
     32 #include "talk/p2p/base/sessionclient.h"
     33 #include "talk/p2p/base/transport.h"
     34 #include "talk/p2p/base/transportchannelproxy.h"
     35 #include "talk/p2p/base/transportinfo.h"
     36 #include "talk/xmpp/constants.h"
     37 #include "talk/xmpp/jid.h"
     38 #include "webrtc/base/bind.h"
     39 #include "webrtc/base/common.h"
     40 #include "webrtc/base/helpers.h"
     41 #include "webrtc/base/logging.h"
     42 #include "webrtc/base/scoped_ptr.h"
     43 #include "webrtc/base/sslstreamadapter.h"
     44 
     45 #include "talk/p2p/base/constants.h"
     46 
     47 namespace cricket {
     48 
     49 using rtc::Bind;
     50 
     51 bool BadMessage(const buzz::QName type,
     52                 const std::string& text,
     53                 MessageError* err) {
     54   err->SetType(type);
     55   err->SetText(text);
     56   return false;
     57 }
     58 
     59 TransportProxy::~TransportProxy() {
     60   for (ChannelMap::iterator iter = channels_.begin();
     61        iter != channels_.end(); ++iter) {
     62     iter->second->SignalDestroyed(iter->second);
     63     delete iter->second;
     64   }
     65 }
     66 
     67 const std::string& TransportProxy::type() const {
     68   return transport_->get()->type();
     69 }
     70 
     71 TransportChannel* TransportProxy::GetChannel(int component) {
     72   ASSERT(rtc::Thread::Current() == worker_thread_);
     73   return GetChannelProxy(component);
     74 }
     75 
     76 TransportChannel* TransportProxy::CreateChannel(
     77     const std::string& name, int component) {
     78   ASSERT(rtc::Thread::Current() == worker_thread_);
     79   ASSERT(GetChannel(component) == NULL);
     80   ASSERT(!transport_->get()->HasChannel(component));
     81 
     82   // We always create a proxy in case we need to change out the transport later.
     83   TransportChannelProxy* channel =
     84       new TransportChannelProxy(content_name(), name, component);
     85   channels_[component] = channel;
     86 
     87   // If we're already negotiated, create an impl and hook it up to the proxy
     88   // channel. If we're connecting, create an impl but don't hook it up yet.
     89   if (negotiated_) {
     90     SetupChannelProxy_w(component, channel);
     91   } else if (connecting_) {
     92     GetOrCreateChannelProxyImpl_w(component);
     93   }
     94   return channel;
     95 }
     96 
     97 bool TransportProxy::HasChannel(int component) {
     98   return transport_->get()->HasChannel(component);
     99 }
    100 
    101 void TransportProxy::DestroyChannel(int component) {
    102   ASSERT(rtc::Thread::Current() == worker_thread_);
    103   TransportChannel* channel = GetChannel(component);
    104   if (channel) {
    105     // If the state of TransportProxy is not NEGOTIATED
    106     // then TransportChannelProxy and its impl are not
    107     // connected. Both must be connected before
    108     // deletion.
    109     if (!negotiated_) {
    110       SetupChannelProxy_w(component, GetChannelProxy(component));
    111     }
    112 
    113     channels_.erase(component);
    114     channel->SignalDestroyed(channel);
    115     delete channel;
    116   }
    117 }
    118 
    119 void TransportProxy::ConnectChannels() {
    120   if (!connecting_) {
    121     if (!negotiated_) {
    122       for (ChannelMap::iterator iter = channels_.begin();
    123            iter != channels_.end(); ++iter) {
    124         GetOrCreateChannelProxyImpl(iter->first);
    125       }
    126     }
    127     connecting_ = true;
    128   }
    129   // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
    130   // don't have any channels yet, so we need to allow this method to be called
    131   // multiple times. Once we fix Transport, we can move this call inside the
    132   // if (!connecting_) block.
    133   transport_->get()->ConnectChannels();
    134 }
    135 
    136 void TransportProxy::CompleteNegotiation() {
    137   if (!negotiated_) {
    138     for (ChannelMap::iterator iter = channels_.begin();
    139          iter != channels_.end(); ++iter) {
    140       SetupChannelProxy(iter->first, iter->second);
    141     }
    142     negotiated_ = true;
    143   }
    144 }
    145 
    146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
    147   for (Candidates::const_iterator cand = candidates.begin();
    148        cand != candidates.end(); ++cand) {
    149     sent_candidates_.push_back(*cand);
    150   }
    151 }
    152 
    153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
    154   for (Candidates::const_iterator cand = candidates.begin();
    155        cand != candidates.end(); ++cand) {
    156     unsent_candidates_.push_back(*cand);
    157   }
    158 }
    159 
    160 bool TransportProxy::GetChannelNameFromComponent(
    161     int component, std::string* channel_name) const {
    162   const TransportChannelProxy* channel = GetChannelProxy(component);
    163   if (channel == NULL) {
    164     return false;
    165   }
    166 
    167   *channel_name = channel->name();
    168   return true;
    169 }
    170 
    171 bool TransportProxy::GetComponentFromChannelName(
    172     const std::string& channel_name, int* component) const {
    173   const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
    174   if (channel == NULL) {
    175     return false;
    176   }
    177 
    178   *component = channel->component();
    179   return true;
    180 }
    181 
    182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
    183   ChannelMap::const_iterator iter = channels_.find(component);
    184   return (iter != channels_.end()) ? iter->second : NULL;
    185 }
    186 
    187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
    188     const std::string& name) const {
    189   for (ChannelMap::const_iterator iter = channels_.begin();
    190        iter != channels_.end();
    191        ++iter) {
    192     if (iter->second->name() == name) {
    193       return iter->second;
    194     }
    195   }
    196   return NULL;
    197 }
    198 
    199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
    200     int component) {
    201   return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
    202       &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
    203 }
    204 
    205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
    206     int component) {
    207   ASSERT(rtc::Thread::Current() == worker_thread_);
    208   TransportChannelImpl* impl = transport_->get()->GetChannel(component);
    209   if (impl == NULL) {
    210     impl = transport_->get()->CreateChannel(component);
    211   }
    212   return impl;
    213 }
    214 
    215 void TransportProxy::SetupChannelProxy(
    216     int component, TransportChannelProxy* transproxy) {
    217   worker_thread_->Invoke<void>(Bind(
    218       &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
    219 }
    220 
    221 void TransportProxy::SetupChannelProxy_w(
    222     int component, TransportChannelProxy* transproxy) {
    223   ASSERT(rtc::Thread::Current() == worker_thread_);
    224   TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
    225   ASSERT(impl != NULL);
    226   transproxy->SetImplementation(impl);
    227 }
    228 
    229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
    230                                              TransportChannelImpl* impl) {
    231   worker_thread_->Invoke<void>(Bind(
    232       &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
    233 }
    234 
    235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
    236                                                TransportChannelImpl* impl) {
    237   ASSERT(rtc::Thread::Current() == worker_thread_);
    238   ASSERT(proxy != NULL);
    239   proxy->SetImplementation(impl);
    240 }
    241 
    242 // This function muxes |this| onto |target| by repointing |this| at
    243 // |target|'s transport and setting our TransportChannelProxies
    244 // to point to |target|'s underlying implementations.
    245 bool TransportProxy::SetupMux(TransportProxy* target) {
    246   // Bail out if there's nothing to do.
    247   if (transport_ == target->transport_) {
    248     return true;
    249   }
    250 
    251   // Run through all channels and remove any non-rtp transport channels before
    252   // setting target transport channels.
    253   for (ChannelMap::const_iterator iter = channels_.begin();
    254        iter != channels_.end(); ++iter) {
    255     if (!target->transport_->get()->HasChannel(iter->first)) {
    256       // Remove if channel doesn't exist in |transport_|.
    257       ReplaceChannelProxyImpl(iter->second, NULL);
    258     } else {
    259       // Replace the impl for all the TransportProxyChannels with the channels
    260       // from |target|'s transport. Fail if there's not an exact match.
    261       ReplaceChannelProxyImpl(
    262           iter->second, target->transport_->get()->CreateChannel(iter->first));
    263     }
    264   }
    265 
    266   // Now replace our transport. Must happen afterwards because
    267   // it deletes all impls as a side effect.
    268   transport_ = target->transport_;
    269   transport_->get()->SignalCandidatesReady.connect(
    270       this, &TransportProxy::OnTransportCandidatesReady);
    271   set_candidates_allocated(target->candidates_allocated());
    272   return true;
    273 }
    274 
    275 void TransportProxy::SetIceRole(IceRole role) {
    276   transport_->get()->SetIceRole(role);
    277 }
    278 
    279 bool TransportProxy::SetLocalTransportDescription(
    280     const TransportDescription& description,
    281     ContentAction action,
    282     std::string* error_desc) {
    283   // If this is an answer, finalize the negotiation.
    284   if (action == CA_ANSWER) {
    285     CompleteNegotiation();
    286   }
    287   bool result = transport_->get()->SetLocalTransportDescription(description,
    288                                                                 action,
    289                                                                 error_desc);
    290   if (result)
    291     local_description_set_ = true;
    292   return result;
    293 }
    294 
    295 bool TransportProxy::SetRemoteTransportDescription(
    296     const TransportDescription& description,
    297     ContentAction action,
    298     std::string* error_desc) {
    299   // If this is an answer, finalize the negotiation.
    300   if (action == CA_ANSWER) {
    301     CompleteNegotiation();
    302   }
    303   bool result = transport_->get()->SetRemoteTransportDescription(description,
    304                                                                  action,
    305                                                                  error_desc);
    306   if (result)
    307     remote_description_set_ = true;
    308   return result;
    309 }
    310 
    311 void TransportProxy::OnSignalingReady() {
    312   // If we're starting a new allocation sequence, reset our state.
    313   set_candidates_allocated(false);
    314   transport_->get()->OnSignalingReady();
    315 }
    316 
    317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
    318                                         std::string* error) {
    319   // Ensure the transport is negotiated before handling candidates.
    320   // TODO(juberti): Remove this once everybody calls SetLocalTD.
    321   CompleteNegotiation();
    322 
    323   // Verify each candidate before passing down to transport layer.
    324   for (Candidates::const_iterator cand = candidates.begin();
    325        cand != candidates.end(); ++cand) {
    326     if (!transport_->get()->VerifyCandidate(*cand, error))
    327       return false;
    328     if (!HasChannel(cand->component())) {
    329       *error = "Candidate has unknown component: " + cand->ToString() +
    330                " for content: " + content_name_;
    331       return false;
    332     }
    333   }
    334   transport_->get()->OnRemoteCandidates(candidates);
    335   return true;
    336 }
    337 
    338 void TransportProxy::SetIdentity(
    339     rtc::SSLIdentity* identity) {
    340   transport_->get()->SetIdentity(identity);
    341 }
    342 
    343 std::string BaseSession::StateToString(State state) {
    344   switch (state) {
    345     case Session::STATE_INIT:
    346       return "STATE_INIT";
    347     case Session::STATE_SENTINITIATE:
    348       return "STATE_SENTINITIATE";
    349     case Session::STATE_RECEIVEDINITIATE:
    350       return "STATE_RECEIVEDINITIATE";
    351     case Session::STATE_SENTPRACCEPT:
    352       return "STATE_SENTPRACCEPT";
    353     case Session::STATE_SENTACCEPT:
    354       return "STATE_SENTACCEPT";
    355     case Session::STATE_RECEIVEDPRACCEPT:
    356       return "STATE_RECEIVEDPRACCEPT";
    357     case Session::STATE_RECEIVEDACCEPT:
    358       return "STATE_RECEIVEDACCEPT";
    359     case Session::STATE_SENTMODIFY:
    360       return "STATE_SENTMODIFY";
    361     case Session::STATE_RECEIVEDMODIFY:
    362       return "STATE_RECEIVEDMODIFY";
    363     case Session::STATE_SENTREJECT:
    364       return "STATE_SENTREJECT";
    365     case Session::STATE_RECEIVEDREJECT:
    366       return "STATE_RECEIVEDREJECT";
    367     case Session::STATE_SENTREDIRECT:
    368       return "STATE_SENTREDIRECT";
    369     case Session::STATE_SENTTERMINATE:
    370       return "STATE_SENTTERMINATE";
    371     case Session::STATE_RECEIVEDTERMINATE:
    372       return "STATE_RECEIVEDTERMINATE";
    373     case Session::STATE_INPROGRESS:
    374       return "STATE_INPROGRESS";
    375     case Session::STATE_DEINIT:
    376       return "STATE_DEINIT";
    377     default:
    378       break;
    379   }
    380   return "STATE_" + rtc::ToString(state);
    381 }
    382 
    383 BaseSession::BaseSession(rtc::Thread* signaling_thread,
    384                          rtc::Thread* worker_thread,
    385                          PortAllocator* port_allocator,
    386                          const std::string& sid,
    387                          const std::string& content_type,
    388                          bool initiator)
    389     : state_(STATE_INIT),
    390       error_(ERROR_NONE),
    391       signaling_thread_(signaling_thread),
    392       worker_thread_(worker_thread),
    393       port_allocator_(port_allocator),
    394       sid_(sid),
    395       content_type_(content_type),
    396       transport_type_(NS_GINGLE_P2P),
    397       initiator_(initiator),
    398       identity_(NULL),
    399       ice_tiebreaker_(rtc::CreateRandomId64()),
    400       role_switch_(false) {
    401   ASSERT(signaling_thread->IsCurrent());
    402 }
    403 
    404 BaseSession::~BaseSession() {
    405   ASSERT(signaling_thread()->IsCurrent());
    406 
    407   ASSERT(state_ != STATE_DEINIT);
    408   LogState(state_, STATE_DEINIT);
    409   state_ = STATE_DEINIT;
    410   SignalState(this, state_);
    411 
    412   for (TransportMap::iterator iter = transports_.begin();
    413        iter != transports_.end(); ++iter) {
    414     delete iter->second;
    415   }
    416 }
    417 
    418 const SessionDescription* BaseSession::local_description() const {
    419   // TODO(tommi): Assert on thread correctness.
    420   return local_description_.get();
    421 }
    422 
    423 const SessionDescription* BaseSession::remote_description() const {
    424   // TODO(tommi): Assert on thread correctness.
    425   return remote_description_.get();
    426 }
    427 
    428 SessionDescription* BaseSession::remote_description() {
    429   // TODO(tommi): Assert on thread correctness.
    430   return remote_description_.get();
    431 }
    432 
    433 void BaseSession::set_local_description(const SessionDescription* sdesc) {
    434   // TODO(tommi): Assert on thread correctness.
    435   if (sdesc != local_description_.get())
    436     local_description_.reset(sdesc);
    437 }
    438 
    439 void BaseSession::set_remote_description(SessionDescription* sdesc) {
    440   // TODO(tommi): Assert on thread correctness.
    441   if (sdesc != remote_description_)
    442     remote_description_.reset(sdesc);
    443 }
    444 
    445 const SessionDescription* BaseSession::initiator_description() const {
    446   // TODO(tommi): Assert on thread correctness.
    447   return initiator_ ? local_description_.get() : remote_description_.get();
    448 }
    449 
    450 bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) {
    451   if (identity_)
    452     return false;
    453   identity_ = identity;
    454   for (TransportMap::iterator iter = transports_.begin();
    455        iter != transports_.end(); ++iter) {
    456     iter->second->SetIdentity(identity_);
    457   }
    458   return true;
    459 }
    460 
    461 bool BaseSession::PushdownTransportDescription(ContentSource source,
    462                                                ContentAction action,
    463                                                std::string* error_desc) {
    464   if (source == CS_LOCAL) {
    465     return PushdownLocalTransportDescription(local_description(),
    466                                              action,
    467                                              error_desc);
    468   }
    469   return PushdownRemoteTransportDescription(remote_description(),
    470                                             action,
    471                                             error_desc);
    472 }
    473 
    474 bool BaseSession::PushdownLocalTransportDescription(
    475     const SessionDescription* sdesc,
    476     ContentAction action,
    477     std::string* error_desc) {
    478   // Update the Transports with the right information, and trigger them to
    479   // start connecting.
    480   for (TransportMap::iterator iter = transports_.begin();
    481        iter != transports_.end(); ++iter) {
    482     // If no transport info was in this session description, ret == false
    483     // and we just skip this one.
    484     TransportDescription tdesc;
    485     bool ret = GetTransportDescription(
    486         sdesc, iter->second->content_name(), &tdesc);
    487     if (ret) {
    488       if (!iter->second->SetLocalTransportDescription(tdesc, action,
    489                                                       error_desc)) {
    490         return false;
    491       }
    492 
    493       iter->second->ConnectChannels();
    494     }
    495   }
    496 
    497   return true;
    498 }
    499 
    500 bool BaseSession::PushdownRemoteTransportDescription(
    501     const SessionDescription* sdesc,
    502     ContentAction action,
    503     std::string* error_desc) {
    504   // Update the Transports with the right information.
    505   for (TransportMap::iterator iter = transports_.begin();
    506        iter != transports_.end(); ++iter) {
    507     TransportDescription tdesc;
    508 
    509     // If no transport info was in this session description, ret == false
    510     // and we just skip this one.
    511     bool ret = GetTransportDescription(
    512         sdesc, iter->second->content_name(), &tdesc);
    513     if (ret) {
    514       if (!iter->second->SetRemoteTransportDescription(tdesc, action,
    515                                                        error_desc)) {
    516         return false;
    517       }
    518     }
    519   }
    520 
    521   return true;
    522 }
    523 
    524 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
    525                                              const std::string& channel_name,
    526                                              int component) {
    527   // We create the proxy "on demand" here because we need to support
    528   // creating channels at any time, even before we send or receive
    529   // initiate messages, which is before we create the transports.
    530   TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
    531   return transproxy->CreateChannel(channel_name, component);
    532 }
    533 
    534 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
    535                                           int component) {
    536   TransportProxy* transproxy = GetTransportProxy(content_name);
    537   if (transproxy == NULL)
    538     return NULL;
    539 
    540   return transproxy->GetChannel(component);
    541 }
    542 
    543 void BaseSession::DestroyChannel(const std::string& content_name,
    544                                  int component) {
    545   TransportProxy* transproxy = GetTransportProxy(content_name);
    546   ASSERT(transproxy != NULL);
    547   transproxy->DestroyChannel(component);
    548 }
    549 
    550 TransportProxy* BaseSession::GetOrCreateTransportProxy(
    551     const std::string& content_name) {
    552   TransportProxy* transproxy = GetTransportProxy(content_name);
    553   if (transproxy)
    554     return transproxy;
    555 
    556   Transport* transport = CreateTransport(content_name);
    557   transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
    558   transport->SetIceTiebreaker(ice_tiebreaker_);
    559   // TODO: Connect all the Transport signals to TransportProxy
    560   // then to the BaseSession.
    561   transport->SignalConnecting.connect(
    562       this, &BaseSession::OnTransportConnecting);
    563   transport->SignalWritableState.connect(
    564       this, &BaseSession::OnTransportWritable);
    565   transport->SignalRequestSignaling.connect(
    566       this, &BaseSession::OnTransportRequestSignaling);
    567   transport->SignalTransportError.connect(
    568       this, &BaseSession::OnTransportSendError);
    569   transport->SignalRouteChange.connect(
    570       this, &BaseSession::OnTransportRouteChange);
    571   transport->SignalCandidatesAllocationDone.connect(
    572       this, &BaseSession::OnTransportCandidatesAllocationDone);
    573   transport->SignalRoleConflict.connect(
    574       this, &BaseSession::OnRoleConflict);
    575   transport->SignalCompleted.connect(
    576       this, &BaseSession::OnTransportCompleted);
    577   transport->SignalFailed.connect(
    578       this, &BaseSession::OnTransportFailed);
    579 
    580   transproxy = new TransportProxy(worker_thread_, sid_, content_name,
    581                                   new TransportWrapper(transport));
    582   transproxy->SignalCandidatesReady.connect(
    583       this, &BaseSession::OnTransportProxyCandidatesReady);
    584   if (identity_)
    585     transproxy->SetIdentity(identity_);
    586   transports_[content_name] = transproxy;
    587 
    588   return transproxy;
    589 }
    590 
    591 Transport* BaseSession::GetTransport(const std::string& content_name) {
    592   TransportProxy* transproxy = GetTransportProxy(content_name);
    593   if (transproxy == NULL)
    594     return NULL;
    595   return transproxy->impl();
    596 }
    597 
    598 TransportProxy* BaseSession::GetTransportProxy(
    599     const std::string& content_name) {
    600   TransportMap::iterator iter = transports_.find(content_name);
    601   return (iter != transports_.end()) ? iter->second : NULL;
    602 }
    603 
    604 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
    605   for (TransportMap::iterator iter = transports_.begin();
    606        iter != transports_.end(); ++iter) {
    607     TransportProxy* transproxy = iter->second;
    608     if (transproxy->impl() == transport) {
    609       return transproxy;
    610     }
    611   }
    612   return NULL;
    613 }
    614 
    615 TransportProxy* BaseSession::GetFirstTransportProxy() {
    616   if (transports_.empty())
    617     return NULL;
    618   return transports_.begin()->second;
    619 }
    620 
    621 void BaseSession::DestroyTransportProxy(
    622     const std::string& content_name) {
    623   TransportMap::iterator iter = transports_.find(content_name);
    624   if (iter != transports_.end()) {
    625     delete iter->second;
    626     transports_.erase(content_name);
    627   }
    628 }
    629 
    630 cricket::Transport* BaseSession::CreateTransport(
    631     const std::string& content_name) {
    632   ASSERT(transport_type_ == NS_GINGLE_P2P);
    633   return new cricket::DtlsTransport<P2PTransport>(
    634       signaling_thread(), worker_thread(), content_name,
    635       port_allocator(), identity_);
    636 }
    637 
    638 bool BaseSession::GetStats(SessionStats* stats) {
    639   for (TransportMap::iterator iter = transports_.begin();
    640        iter != transports_.end(); ++iter) {
    641     std::string proxy_id = iter->second->content_name();
    642     // We are ignoring not-yet-instantiated transports.
    643     if (iter->second->impl()) {
    644       std::string transport_id = iter->second->impl()->content_name();
    645       stats->proxy_to_transport[proxy_id] = transport_id;
    646       if (stats->transport_stats.find(transport_id)
    647           == stats->transport_stats.end()) {
    648         TransportStats subinfos;
    649         if (!iter->second->impl()->GetStats(&subinfos)) {
    650           return false;
    651         }
    652         stats->transport_stats[transport_id] = subinfos;
    653       }
    654     }
    655   }
    656   return true;
    657 }
    658 
    659 void BaseSession::SetState(State state) {
    660   ASSERT(signaling_thread_->IsCurrent());
    661   if (state != state_) {
    662     LogState(state_, state);
    663     state_ = state;
    664     SignalState(this, state_);
    665     signaling_thread_->Post(this, MSG_STATE);
    666   }
    667   SignalNewDescription();
    668 }
    669 
    670 void BaseSession::SetError(Error error, const std::string& error_desc) {
    671   ASSERT(signaling_thread_->IsCurrent());
    672   if (error != error_) {
    673     error_ = error;
    674     error_desc_ = error_desc;
    675     SignalError(this, error);
    676   }
    677 }
    678 
    679 void BaseSession::OnSignalingReady() {
    680   ASSERT(signaling_thread()->IsCurrent());
    681   for (TransportMap::iterator iter = transports_.begin();
    682        iter != transports_.end(); ++iter) {
    683     iter->second->OnSignalingReady();
    684   }
    685 }
    686 
    687 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
    688 // start, remove this method once everyone calls PushdownLocalTD.
    689 void BaseSession::SpeculativelyConnectAllTransportChannels() {
    690   // Put all transports into the connecting state.
    691   for (TransportMap::iterator iter = transports_.begin();
    692        iter != transports_.end(); ++iter) {
    693     iter->second->ConnectChannels();
    694   }
    695 }
    696 
    697 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
    698                                      const Candidates& candidates,
    699                                      std::string* error) {
    700   // Give candidates to the appropriate transport, and tell that transport
    701   // to start connecting, if it's not already doing so.
    702   TransportProxy* transproxy = GetTransportProxy(content_name);
    703   if (!transproxy) {
    704     *error = "Unknown content name " + content_name;
    705     return false;
    706   }
    707   if (!transproxy->OnRemoteCandidates(candidates, error)) {
    708     return false;
    709   }
    710   // TODO(juberti): Remove this call once we can be sure that we always have
    711   // a local transport description (which will trigger the connection).
    712   transproxy->ConnectChannels();
    713   return true;
    714 }
    715 
    716 bool BaseSession::MaybeEnableMuxingSupport() {
    717   // We need both a local and remote description to decide if we should mux.
    718   if ((state_ == STATE_SENTINITIATE ||
    719       state_ == STATE_RECEIVEDINITIATE) &&
    720       ((local_description_ == NULL) ||
    721       (remote_description_ == NULL))) {
    722     return false;
    723   }
    724 
    725   // In order to perform the multiplexing, we need all proxies to be in the
    726   // negotiated state, i.e. to have implementations underneath.
    727   // Ensure that this is the case, regardless of whether we are going to mux.
    728   for (TransportMap::iterator iter = transports_.begin();
    729        iter != transports_.end(); ++iter) {
    730     ASSERT(iter->second->negotiated());
    731     if (!iter->second->negotiated())
    732       return false;
    733   }
    734 
    735   // If both sides agree to BUNDLE, mux all the specified contents onto the
    736   // transport belonging to the first content name in the BUNDLE group.
    737   // If the contents are already muxed, this will be a no-op.
    738   // TODO(juberti): Should this check that local and remote have configured
    739   // BUNDLE the same way?
    740   bool candidates_allocated = IsCandidateAllocationDone();
    741   const ContentGroup* local_bundle_group =
    742       local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    743   const ContentGroup* remote_bundle_group =
    744       remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    745   if (local_bundle_group && remote_bundle_group &&
    746       local_bundle_group->FirstContentName()) {
    747     const std::string* content_name = local_bundle_group->FirstContentName();
    748     const ContentInfo* content =
    749         local_description_->GetContentByName(*content_name);
    750     ASSERT(content != NULL);
    751     if (!SetSelectedProxy(content->name, local_bundle_group)) {
    752       LOG(LS_WARNING) << "Failed to set up BUNDLE";
    753       return false;
    754     }
    755 
    756     // If we weren't done gathering before, we might be done now, as a result
    757     // of enabling mux.
    758     LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
    759                  << *content_name;
    760     if (!candidates_allocated) {
    761       MaybeCandidateAllocationDone();
    762     }
    763   } else {
    764     LOG(LS_INFO) << "No BUNDLE information, not bundling.";
    765   }
    766   return true;
    767 }
    768 
    769 bool BaseSession::SetSelectedProxy(const std::string& content_name,
    770                                    const ContentGroup* muxed_group) {
    771   TransportProxy* selected_proxy = GetTransportProxy(content_name);
    772   if (!selected_proxy) {
    773     return false;
    774   }
    775 
    776   ASSERT(selected_proxy->negotiated());
    777   for (TransportMap::iterator iter = transports_.begin();
    778        iter != transports_.end(); ++iter) {
    779     // If content is part of the mux group, then repoint its proxy at the
    780     // transport object that we have chosen to mux onto. If the proxy
    781     // is already pointing at the right object, it will be a no-op.
    782     if (muxed_group->HasContentName(iter->first) &&
    783         !iter->second->SetupMux(selected_proxy)) {
    784       return false;
    785     }
    786   }
    787   return true;
    788 }
    789 
    790 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
    791   // TODO(juberti): This is a clunky way of processing the done signal. Instead,
    792   // TransportProxy should receive the done signal directly, set its allocated
    793   // flag internally, and then reissue the done signal to Session.
    794   // Overall we should make TransportProxy receive *all* the signals from
    795   // Transport, since this removes the need to manually iterate over all
    796   // the transports, as is needed to make sure signals are handled properly
    797   // when BUNDLEing.
    798   // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
    799   // that make it prohibitively difficult to run dbg builds. Disabled for now.
    800   //ASSERT(!IsCandidateAllocationDone());
    801   for (TransportMap::iterator iter = transports_.begin();
    802        iter != transports_.end(); ++iter) {
    803     if (iter->second->impl() == transport) {
    804       iter->second->set_candidates_allocated(true);
    805     }
    806   }
    807   MaybeCandidateAllocationDone();
    808 }
    809 
    810 bool BaseSession::IsCandidateAllocationDone() const {
    811   for (TransportMap::const_iterator iter = transports_.begin();
    812        iter != transports_.end(); ++iter) {
    813     if (!iter->second->candidates_allocated())
    814       return false;
    815   }
    816   return true;
    817 }
    818 
    819 void BaseSession::MaybeCandidateAllocationDone() {
    820   if (IsCandidateAllocationDone()) {
    821     LOG(LS_INFO) << "Candidate gathering is complete.";
    822     OnCandidatesAllocationDone();
    823   }
    824 }
    825 
    826 void BaseSession::OnRoleConflict() {
    827   if (role_switch_) {
    828     LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
    829     return;
    830   }
    831 
    832   role_switch_ = true;
    833   for (TransportMap::iterator iter = transports_.begin();
    834        iter != transports_.end(); ++iter) {
    835     // Role will be reverse of initial role setting.
    836     IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
    837     iter->second->SetIceRole(role);
    838   }
    839 }
    840 
    841 void BaseSession::LogState(State old_state, State new_state) {
    842   LOG(LS_INFO) << "Session:" << id()
    843                << " Old state:" << StateToString(old_state)
    844                << " New state:" << StateToString(new_state)
    845                << " Type:" << content_type()
    846                << " Transport:" << transport_type();
    847 }
    848 
    849 // static
    850 bool BaseSession::GetTransportDescription(const SessionDescription* description,
    851                                           const std::string& content_name,
    852                                           TransportDescription* tdesc) {
    853   if (!description || !tdesc) {
    854     return false;
    855   }
    856   const TransportInfo* transport_info =
    857       description->GetTransportInfoByName(content_name);
    858   if (!transport_info) {
    859     return false;
    860   }
    861   *tdesc = transport_info->description;
    862   return true;
    863 }
    864 
    865 void BaseSession::SignalNewDescription() {
    866   ContentAction action;
    867   ContentSource source;
    868   if (!GetContentAction(&action, &source)) {
    869     return;
    870   }
    871   if (source == CS_LOCAL) {
    872     SignalNewLocalDescription(this, action);
    873   } else {
    874     SignalNewRemoteDescription(this, action);
    875   }
    876 }
    877 
    878 bool BaseSession::GetContentAction(ContentAction* action,
    879                                    ContentSource* source) {
    880   switch (state_) {
    881     // new local description
    882     case STATE_SENTINITIATE:
    883       *action = CA_OFFER;
    884       *source = CS_LOCAL;
    885       break;
    886     case STATE_SENTPRACCEPT:
    887       *action = CA_PRANSWER;
    888       *source = CS_LOCAL;
    889       break;
    890     case STATE_SENTACCEPT:
    891       *action = CA_ANSWER;
    892       *source = CS_LOCAL;
    893       break;
    894     // new remote description
    895     case STATE_RECEIVEDINITIATE:
    896       *action = CA_OFFER;
    897       *source = CS_REMOTE;
    898       break;
    899     case STATE_RECEIVEDPRACCEPT:
    900       *action = CA_PRANSWER;
    901       *source = CS_REMOTE;
    902       break;
    903     case STATE_RECEIVEDACCEPT:
    904       *action = CA_ANSWER;
    905       *source = CS_REMOTE;
    906       break;
    907     default:
    908       return false;
    909   }
    910   return true;
    911 }
    912 
    913 void BaseSession::OnMessage(rtc::Message *pmsg) {
    914   switch (pmsg->message_id) {
    915   case MSG_TIMEOUT:
    916     // Session timeout has occured.
    917     SetError(ERROR_TIME, "Session timeout has occured.");
    918     break;
    919 
    920   case MSG_STATE:
    921     switch (state_) {
    922     case STATE_SENTACCEPT:
    923     case STATE_RECEIVEDACCEPT:
    924       SetState(STATE_INPROGRESS);
    925       break;
    926 
    927     default:
    928       // Explicitly ignoring some states here.
    929       break;
    930     }
    931     break;
    932   }
    933 }
    934 
    935 Session::Session(SessionManager* session_manager,
    936                  const std::string& local_name,
    937                  const std::string& initiator_name,
    938                  const std::string& sid,
    939                  const std::string& content_type,
    940                  SessionClient* client)
    941     : BaseSession(session_manager->signaling_thread(),
    942                   session_manager->worker_thread(),
    943                   session_manager->port_allocator(),
    944                   sid, content_type, initiator_name == local_name) {
    945   ASSERT(client != NULL);
    946   session_manager_ = session_manager;
    947   local_name_ = local_name;
    948   initiator_name_ = initiator_name;
    949   transport_parser_ = new P2PTransportParser();
    950   client_ = client;
    951   initiate_acked_ = false;
    952   current_protocol_ = PROTOCOL_HYBRID;
    953 }
    954 
    955 Session::~Session() {
    956   delete transport_parser_;
    957 }
    958 
    959 bool Session::Initiate(const std::string& to,
    960                        const SessionDescription* sdesc) {
    961   ASSERT(signaling_thread()->IsCurrent());
    962   SessionError error;
    963 
    964   // Only from STATE_INIT
    965   if (state() != STATE_INIT)
    966     return false;
    967 
    968   // Setup for signaling.
    969   set_remote_name(to);
    970   set_local_description(sdesc);
    971   if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
    972                               &error)) {
    973     LOG(LS_ERROR) << "Could not create transports: " << error.text;
    974     return false;
    975   }
    976 
    977   if (!SendInitiateMessage(sdesc, &error)) {
    978     LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
    979     return false;
    980   }
    981 
    982   // We need to connect transport proxy and impl here so that we can process
    983   // the TransportDescriptions.
    984   SpeculativelyConnectAllTransportChannels();
    985 
    986   PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
    987   SetState(Session::STATE_SENTINITIATE);
    988   return true;
    989 }
    990 
    991 bool Session::Accept(const SessionDescription* sdesc) {
    992   ASSERT(signaling_thread()->IsCurrent());
    993 
    994   // Only if just received initiate
    995   if (state() != STATE_RECEIVEDINITIATE)
    996     return false;
    997 
    998   // Setup for signaling.
    999   set_local_description(sdesc);
   1000 
   1001   SessionError error;
   1002   if (!SendAcceptMessage(sdesc, &error)) {
   1003     LOG(LS_ERROR) << "Could not send accept message: " << error.text;
   1004     return false;
   1005   }
   1006   // TODO(juberti): Add BUNDLE support to transport-info messages.
   1007   PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
   1008   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
   1009   SetState(Session::STATE_SENTACCEPT);
   1010   return true;
   1011 }
   1012 
   1013 bool Session::Reject(const std::string& reason) {
   1014   ASSERT(signaling_thread()->IsCurrent());
   1015 
   1016   // Reject is sent in response to an initiate or modify, to reject the
   1017   // request
   1018   if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
   1019     return false;
   1020 
   1021   SessionError error;
   1022   if (!SendRejectMessage(reason, &error)) {
   1023     LOG(LS_ERROR) << "Could not send reject message: " << error.text;
   1024     return false;
   1025   }
   1026 
   1027   SetState(STATE_SENTREJECT);
   1028   return true;
   1029 }
   1030 
   1031 bool Session::TerminateWithReason(const std::string& reason) {
   1032   ASSERT(signaling_thread()->IsCurrent());
   1033 
   1034   // Either side can terminate, at any time.
   1035   switch (state()) {
   1036     case STATE_SENTTERMINATE:
   1037     case STATE_RECEIVEDTERMINATE:
   1038       return false;
   1039 
   1040     case STATE_SENTREJECT:
   1041     case STATE_RECEIVEDREJECT:
   1042       // We don't need to send terminate if we sent or received a reject...
   1043       // it's implicit.
   1044       break;
   1045 
   1046     default:
   1047       SessionError error;
   1048       if (!SendTerminateMessage(reason, &error)) {
   1049         LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
   1050         return false;
   1051       }
   1052       break;
   1053   }
   1054 
   1055   SetState(STATE_SENTTERMINATE);
   1056   return true;
   1057 }
   1058 
   1059 bool Session::SendInfoMessage(const XmlElements& elems,
   1060                               const std::string& remote_name) {
   1061   ASSERT(signaling_thread()->IsCurrent());
   1062   SessionError error;
   1063   if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
   1064     LOG(LS_ERROR) << "Could not send info message " << error.text;
   1065     return false;
   1066   }
   1067   return true;
   1068 }
   1069 
   1070 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
   1071   XmlElements elems;
   1072   WriteError write_error;
   1073   if (!WriteDescriptionInfo(current_protocol_,
   1074                             contents,
   1075                             GetContentParsers(),
   1076                             &elems, &write_error)) {
   1077     LOG(LS_ERROR) << "Could not write description info message: "
   1078                   << write_error.text;
   1079     return false;
   1080   }
   1081   SessionError error;
   1082   if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
   1083     LOG(LS_ERROR) << "Could not send description info message: "
   1084                   << error.text;
   1085     return false;
   1086   }
   1087   return true;
   1088 }
   1089 
   1090 TransportInfos Session::GetEmptyTransportInfos(
   1091     const ContentInfos& contents) const {
   1092   TransportInfos tinfos;
   1093   for (ContentInfos::const_iterator content = contents.begin();
   1094        content != contents.end(); ++content) {
   1095     tinfos.push_back(TransportInfo(content->name,
   1096                                    TransportDescription(transport_type(),
   1097                                                         std::string(),
   1098                                                         std::string())));
   1099   }
   1100   return tinfos;
   1101 }
   1102 
   1103 bool Session::OnRemoteCandidates(
   1104     const TransportInfos& tinfos, ParseError* error) {
   1105   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1106        tinfo != tinfos.end(); ++tinfo) {
   1107     std::string str_error;
   1108     if (!BaseSession::OnRemoteCandidates(
   1109         tinfo->content_name, tinfo->description.candidates, &str_error)) {
   1110       return BadParse(str_error, error);
   1111     }
   1112   }
   1113   return true;
   1114 }
   1115 
   1116 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
   1117                                      SessionError* error) {
   1118   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1119        tinfo != tinfos.end(); ++tinfo) {
   1120     if (tinfo->description.transport_type != transport_type()) {
   1121       error->SetText("No supported transport in offer.");
   1122       return false;
   1123     }
   1124 
   1125     GetOrCreateTransportProxy(tinfo->content_name);
   1126   }
   1127   return true;
   1128 }
   1129 
   1130 TransportParserMap Session::GetTransportParsers() {
   1131   TransportParserMap parsers;
   1132   parsers[transport_type()] = transport_parser_;
   1133   return parsers;
   1134 }
   1135 
   1136 CandidateTranslatorMap Session::GetCandidateTranslators() {
   1137   CandidateTranslatorMap translators;
   1138   // NOTE: This technique makes it impossible to parse G-ICE
   1139   // candidates in session-initiate messages because the channels
   1140   // aren't yet created at that point.  Since we don't use candidates
   1141   // in session-initiate messages, we should be OK.  Once we switch to
   1142   // ICE, this translation shouldn't be necessary.
   1143   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1144        iter != transport_proxies().end(); ++iter) {
   1145     translators[iter->first] = iter->second;
   1146   }
   1147   return translators;
   1148 }
   1149 
   1150 ContentParserMap Session::GetContentParsers() {
   1151   ContentParserMap parsers;
   1152   parsers[content_type()] = client_;
   1153   // We need to be able parse both RTP-based and SCTP-based Jingle
   1154   // with the same client.
   1155   if (content_type() == NS_JINGLE_RTP) {
   1156     parsers[NS_JINGLE_DRAFT_SCTP] = client_;
   1157   }
   1158   return parsers;
   1159 }
   1160 
   1161 void Session::OnTransportRequestSignaling(Transport* transport) {
   1162   ASSERT(signaling_thread()->IsCurrent());
   1163   TransportProxy* transproxy = GetTransportProxy(transport);
   1164   ASSERT(transproxy != NULL);
   1165   if (transproxy) {
   1166     // Reset candidate allocation status for the transport proxy.
   1167     transproxy->set_candidates_allocated(false);
   1168   }
   1169   SignalRequestSignaling(this);
   1170 }
   1171 
   1172 void Session::OnTransportConnecting(Transport* transport) {
   1173   // This is an indication that we should begin watching the writability
   1174   // state of the transport.
   1175   OnTransportWritable(transport);
   1176 }
   1177 
   1178 void Session::OnTransportWritable(Transport* transport) {
   1179   ASSERT(signaling_thread()->IsCurrent());
   1180 
   1181   // If the transport is not writable, start a timer to make sure that it
   1182   // becomes writable within a reasonable amount of time.  If it does not, we
   1183   // terminate since we can't actually send data.  If the transport is writable,
   1184   // cancel the timer.  Note that writability transitions may occur repeatedly
   1185   // during the lifetime of the session.
   1186   signaling_thread()->Clear(this, MSG_TIMEOUT);
   1187   if (transport->HasChannels() && !transport->writable()) {
   1188     signaling_thread()->PostDelayed(
   1189         session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
   1190   }
   1191 }
   1192 
   1193 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
   1194                                               const Candidates& candidates) {
   1195   ASSERT(signaling_thread()->IsCurrent());
   1196   if (transproxy != NULL) {
   1197     if (initiator() && !initiate_acked_) {
   1198       // TODO: This is to work around server re-ordering
   1199       // messages.  We send the candidates once the session-initiate
   1200       // is acked.  Once we have fixed the server to guarantee message
   1201       // order, we can remove this case.
   1202       transproxy->AddUnsentCandidates(candidates);
   1203     } else {
   1204       if (!transproxy->negotiated()) {
   1205         transproxy->AddSentCandidates(candidates);
   1206       }
   1207       SessionError error;
   1208       if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
   1209         LOG(LS_ERROR) << "Could not send transport info message: "
   1210                       << error.text;
   1211         return;
   1212       }
   1213     }
   1214   }
   1215 }
   1216 
   1217 void Session::OnTransportSendError(Transport* transport,
   1218                                    const buzz::XmlElement* stanza,
   1219                                    const buzz::QName& name,
   1220                                    const std::string& type,
   1221                                    const std::string& text,
   1222                                    const buzz::XmlElement* extra_info) {
   1223   ASSERT(signaling_thread()->IsCurrent());
   1224   SignalErrorMessage(this, stanza, name, type, text, extra_info);
   1225 }
   1226 
   1227 void Session::OnIncomingMessage(const SessionMessage& msg) {
   1228   ASSERT(signaling_thread()->IsCurrent());
   1229   ASSERT(state() == STATE_INIT || msg.from == remote_name());
   1230 
   1231   if (current_protocol_== PROTOCOL_HYBRID) {
   1232     if (msg.protocol == PROTOCOL_GINGLE) {
   1233       current_protocol_ = PROTOCOL_GINGLE;
   1234     } else {
   1235       current_protocol_ = PROTOCOL_JINGLE;
   1236     }
   1237   }
   1238 
   1239   bool valid = false;
   1240   MessageError error;
   1241   switch (msg.type) {
   1242     case ACTION_SESSION_INITIATE:
   1243       valid = OnInitiateMessage(msg, &error);
   1244       break;
   1245     case ACTION_SESSION_INFO:
   1246       valid = OnInfoMessage(msg);
   1247       break;
   1248     case ACTION_SESSION_ACCEPT:
   1249       valid = OnAcceptMessage(msg, &error);
   1250       break;
   1251     case ACTION_SESSION_REJECT:
   1252       valid = OnRejectMessage(msg, &error);
   1253       break;
   1254     case ACTION_SESSION_TERMINATE:
   1255       valid = OnTerminateMessage(msg, &error);
   1256       break;
   1257     case ACTION_TRANSPORT_INFO:
   1258       valid = OnTransportInfoMessage(msg, &error);
   1259       break;
   1260     case ACTION_TRANSPORT_ACCEPT:
   1261       valid = OnTransportAcceptMessage(msg, &error);
   1262       break;
   1263     case ACTION_DESCRIPTION_INFO:
   1264       valid = OnDescriptionInfoMessage(msg, &error);
   1265       break;
   1266     default:
   1267       valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
   1268                          "unknown session message type",
   1269                          &error);
   1270   }
   1271 
   1272   if (valid) {
   1273     SendAcknowledgementMessage(msg.stanza);
   1274   } else {
   1275     SignalErrorMessage(this, msg.stanza, error.type,
   1276                        "modify", error.text, NULL);
   1277   }
   1278 }
   1279 
   1280 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
   1281                                  const buzz::XmlElement* response_stanza,
   1282                                  const SessionMessage& msg) {
   1283   ASSERT(signaling_thread()->IsCurrent());
   1284 
   1285   if (msg.type == ACTION_SESSION_INITIATE) {
   1286     OnInitiateAcked();
   1287   }
   1288 }
   1289 
   1290 void Session::OnInitiateAcked() {
   1291   // TODO: This is to work around server re-ordering
   1292   // messages.  We send the candidates once the session-initiate
   1293   // is acked.  Once we have fixed the server to guarantee message
   1294   // order, we can remove this case.
   1295   if (!initiate_acked_) {
   1296     initiate_acked_ = true;
   1297     SessionError error;
   1298     SendAllUnsentTransportInfoMessages(&error);
   1299   }
   1300 }
   1301 
   1302 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
   1303                            const buzz::XmlElement* error_stanza) {
   1304   ASSERT(signaling_thread()->IsCurrent());
   1305 
   1306   SessionMessage msg;
   1307   ParseError parse_error;
   1308   if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
   1309     LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
   1310                   << ":" << orig_stanza;
   1311     return;
   1312   }
   1313 
   1314   // If the error is a session redirect, call OnRedirectError, which will
   1315   // continue the session with a new remote JID.
   1316   SessionRedirect redirect;
   1317   if (FindSessionRedirect(error_stanza, &redirect)) {
   1318     SessionError error;
   1319     if (!OnRedirectError(redirect, &error)) {
   1320       // TODO: Should we send a message back?  The standard
   1321       // says nothing about it.
   1322       std::ostringstream desc;
   1323       desc << "Failed to redirect: " << error.text;
   1324       LOG(LS_ERROR) << desc.str();
   1325       SetError(ERROR_RESPONSE, desc.str());
   1326     }
   1327     return;
   1328   }
   1329 
   1330   std::string error_type = "cancel";
   1331 
   1332   const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
   1333   if (error) {
   1334     error_type = error->Attr(buzz::QN_TYPE);
   1335 
   1336     LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
   1337                   << "in response to:\n" << orig_stanza->Str();
   1338   } else {
   1339     // don't crash if <error> is missing
   1340     LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
   1341     return;
   1342   }
   1343 
   1344   if (msg.type == ACTION_TRANSPORT_INFO) {
   1345     // Transport messages frequently generate errors because they are sent right
   1346     // when we detect a network failure.  For that reason, we ignore such
   1347     // errors, because if we do not establish writability again, we will
   1348     // terminate anyway.  The exceptions are transport-specific error tags,
   1349     // which we pass on to the respective transport.
   1350   } else if ((error_type != "continue") && (error_type != "wait")) {
   1351     // We do not set an error if the other side said it is okay to continue
   1352     // (possibly after waiting).  These errors can be ignored.
   1353     SetError(ERROR_RESPONSE, "");
   1354   }
   1355 }
   1356 
   1357 bool Session::OnInitiateMessage(const SessionMessage& msg,
   1358                                 MessageError* error) {
   1359   if (!CheckState(STATE_INIT, error))
   1360     return false;
   1361 
   1362   SessionInitiate init;
   1363   if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
   1364                             GetContentParsers(), GetTransportParsers(),
   1365                             GetCandidateTranslators(),
   1366                             &init, error))
   1367     return false;
   1368 
   1369   SessionError session_error;
   1370   if (!CreateTransportProxies(init.transports, &session_error)) {
   1371     return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
   1372                       session_error.text, error);
   1373   }
   1374 
   1375   set_remote_name(msg.from);
   1376   set_initiator_name(msg.initiator);
   1377   set_remote_description(new SessionDescription(init.ClearContents(),
   1378                                                 init.transports,
   1379                                                 init.groups));
   1380   // Updating transport with TransportDescription.
   1381   PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
   1382   SetState(STATE_RECEIVEDINITIATE);
   1383 
   1384   // Users of Session may listen to state change and call Reject().
   1385   if (state() != STATE_SENTREJECT) {
   1386     if (!OnRemoteCandidates(init.transports, error))
   1387       return false;
   1388 
   1389     // TODO(juberti): Auto-generate and push down the local transport answer.
   1390     // This is necessary for trickling to work with RFC 5245 ICE.
   1391   }
   1392   return true;
   1393 }
   1394 
   1395 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
   1396   if (!CheckState(STATE_SENTINITIATE, error))
   1397     return false;
   1398 
   1399   SessionAccept accept;
   1400   if (!ParseSessionAccept(msg.protocol, msg.action_elem,
   1401                           GetContentParsers(), GetTransportParsers(),
   1402                           GetCandidateTranslators(),
   1403                           &accept, error)) {
   1404     return false;
   1405   }
   1406 
   1407   // If we get an accept, we can assume the initiate has been
   1408   // received, even if we haven't gotten an IQ response.
   1409   OnInitiateAcked();
   1410 
   1411   set_remote_description(new SessionDescription(accept.ClearContents(),
   1412                                                 accept.transports,
   1413                                                 accept.groups));
   1414   // Updating transport with TransportDescription.
   1415   PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
   1416   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
   1417   SetState(STATE_RECEIVEDACCEPT);
   1418 
   1419   if (!OnRemoteCandidates(accept.transports, error))
   1420     return false;
   1421 
   1422   return true;
   1423 }
   1424 
   1425 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
   1426   if (!CheckState(STATE_SENTINITIATE, error))
   1427     return false;
   1428 
   1429   SetState(STATE_RECEIVEDREJECT);
   1430   return true;
   1431 }
   1432 
   1433 bool Session::OnInfoMessage(const SessionMessage& msg) {
   1434   SignalInfoMessage(this, msg.action_elem);
   1435   return true;
   1436 }
   1437 
   1438 bool Session::OnTerminateMessage(const SessionMessage& msg,
   1439                                  MessageError* error) {
   1440   SessionTerminate term;
   1441   if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
   1442     return false;
   1443 
   1444   SignalReceivedTerminateReason(this, term.reason);
   1445   if (term.debug_reason != buzz::STR_EMPTY) {
   1446     LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
   1447   }
   1448 
   1449   SetState(STATE_RECEIVEDTERMINATE);
   1450   return true;
   1451 }
   1452 
   1453 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
   1454                                      MessageError* error) {
   1455   TransportInfos tinfos;
   1456   if (!ParseTransportInfos(msg.protocol, msg.action_elem,
   1457                            initiator_description()->contents(),
   1458                            GetTransportParsers(), GetCandidateTranslators(),
   1459                            &tinfos, error))
   1460     return false;
   1461 
   1462   if (!OnRemoteCandidates(tinfos, error))
   1463     return false;
   1464 
   1465   return true;
   1466 }
   1467 
   1468 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
   1469                                        MessageError* error) {
   1470   // TODO: Currently here only for compatibility with
   1471   // Gingle 1.1 clients (notably, Google Voice).
   1472   return true;
   1473 }
   1474 
   1475 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
   1476                               MessageError* error) {
   1477   if (!CheckState(STATE_INPROGRESS, error))
   1478     return false;
   1479 
   1480   DescriptionInfo description_info;
   1481   if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
   1482                             GetContentParsers(), GetTransportParsers(),
   1483                             GetCandidateTranslators(),
   1484                             &description_info, error)) {
   1485     return false;
   1486   }
   1487 
   1488   ContentInfos& updated_contents = description_info.contents;
   1489 
   1490   // TODO: Currently, reflector sends back
   1491   // video stream updates even for an audio-only call, which causes
   1492   // this to fail.  Put this back once reflector is fixed.
   1493   //
   1494   // ContentInfos::iterator it;
   1495   // First, ensure all updates are valid before modifying remote_description_.
   1496   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1497   //   if (remote_description()->GetContentByName(it->name) == NULL) {
   1498   //     return false;
   1499   //   }
   1500   // }
   1501 
   1502   // TODO: We used to replace contents from an update, but
   1503   // that no longer works with partial updates.  We need to figure out
   1504   // a way to merge patial updates into contents.  For now, users of
   1505   // Session should listen to SignalRemoteDescriptionUpdate and handle
   1506   // updates.  They should not expect remote_description to be the
   1507   // latest value.
   1508   //
   1509   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1510   //     remote_description()->RemoveContentByName(it->name);
   1511   //     remote_description()->AddContent(it->name, it->type, it->description);
   1512   //   }
   1513   // }
   1514 
   1515   SignalRemoteDescriptionUpdate(this, updated_contents);
   1516   return true;
   1517 }
   1518 
   1519 bool BareJidsEqual(const std::string& name1,
   1520                    const std::string& name2) {
   1521   buzz::Jid jid1(name1);
   1522   buzz::Jid jid2(name2);
   1523 
   1524   return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
   1525 }
   1526 
   1527 bool Session::OnRedirectError(const SessionRedirect& redirect,
   1528                               SessionError* error) {
   1529   MessageError message_error;
   1530   if (!CheckState(STATE_SENTINITIATE, &message_error)) {
   1531     return BadWrite(message_error.text, error);
   1532   }
   1533 
   1534   if (!BareJidsEqual(remote_name(), redirect.target))
   1535     return BadWrite("Redirection not allowed: must be the same bare jid.",
   1536                     error);
   1537 
   1538   // When we receive a redirect, we point the session at the new JID
   1539   // and resend the candidates.
   1540   set_remote_name(redirect.target);
   1541   return (SendInitiateMessage(local_description(), error) &&
   1542           ResendAllTransportInfoMessages(error));
   1543 }
   1544 
   1545 bool Session::CheckState(State expected, MessageError* error) {
   1546   if (state() != expected) {
   1547     // The server can deliver messages out of order/repeated for various
   1548     // reasons. For example, if the server does not recive our iq response,
   1549     // it could assume that the iq it sent was lost, and will then send
   1550     // it again. Ideally, we should implement reliable messaging with
   1551     // duplicate elimination.
   1552     return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
   1553                       "message not allowed in current state",
   1554                       error);
   1555   }
   1556   return true;
   1557 }
   1558 
   1559 void Session::SetError(Error error, const std::string& error_desc) {
   1560   BaseSession::SetError(error, error_desc);
   1561   if (error != ERROR_NONE)
   1562     signaling_thread()->Post(this, MSG_ERROR);
   1563 }
   1564 
   1565 void Session::OnMessage(rtc::Message* pmsg) {
   1566   // preserve this because BaseSession::OnMessage may modify it
   1567   State orig_state = state();
   1568 
   1569   BaseSession::OnMessage(pmsg);
   1570 
   1571   switch (pmsg->message_id) {
   1572   case MSG_ERROR:
   1573     TerminateWithReason(STR_TERMINATE_ERROR);
   1574     break;
   1575 
   1576   case MSG_STATE:
   1577     switch (orig_state) {
   1578     case STATE_SENTREJECT:
   1579     case STATE_RECEIVEDREJECT:
   1580       // Assume clean termination.
   1581       Terminate();
   1582       break;
   1583 
   1584     case STATE_SENTTERMINATE:
   1585     case STATE_RECEIVEDTERMINATE:
   1586       session_manager_->DestroySession(this);
   1587       break;
   1588 
   1589     default:
   1590       // Explicitly ignoring some states here.
   1591       break;
   1592     }
   1593     break;
   1594   }
   1595 }
   1596 
   1597 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
   1598                                   SessionError* error) {
   1599   SessionInitiate init;
   1600   init.contents = sdesc->contents();
   1601   init.transports = GetEmptyTransportInfos(init.contents);
   1602   init.groups = sdesc->groups();
   1603   return SendMessage(ACTION_SESSION_INITIATE, init, error);
   1604 }
   1605 
   1606 bool Session::WriteSessionAction(
   1607     SignalingProtocol protocol, const SessionInitiate& init,
   1608     XmlElements* elems, WriteError* error) {
   1609   return WriteSessionInitiate(protocol, init.contents, init.transports,
   1610                               GetContentParsers(), GetTransportParsers(),
   1611                               GetCandidateTranslators(), init.groups,
   1612                               elems, error);
   1613 }
   1614 
   1615 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
   1616                                 SessionError* error) {
   1617   XmlElements elems;
   1618   if (!WriteSessionAccept(current_protocol_,
   1619                           sdesc->contents(),
   1620                           GetEmptyTransportInfos(sdesc->contents()),
   1621                           GetContentParsers(), GetTransportParsers(),
   1622                           GetCandidateTranslators(), sdesc->groups(),
   1623                           &elems, error)) {
   1624     return false;
   1625   }
   1626   return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
   1627 }
   1628 
   1629 bool Session::SendRejectMessage(const std::string& reason,
   1630                                 SessionError* error) {
   1631   SessionTerminate term(reason);
   1632   return SendMessage(ACTION_SESSION_REJECT, term, error);
   1633 }
   1634 
   1635 bool Session::SendTerminateMessage(const std::string& reason,
   1636                                    SessionError* error) {
   1637   SessionTerminate term(reason);
   1638   return SendMessage(ACTION_SESSION_TERMINATE, term, error);
   1639 }
   1640 
   1641 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1642                                  const SessionTerminate& term,
   1643                                  XmlElements* elems, WriteError* error) {
   1644   WriteSessionTerminate(protocol, term, elems);
   1645   return true;
   1646 }
   1647 
   1648 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
   1649                                        SessionError* error) {
   1650   return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
   1651 }
   1652 
   1653 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
   1654                                        const Candidates& candidates,
   1655                                        SessionError* error) {
   1656   return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
   1657       TransportDescription(transproxy->type(), std::vector<std::string>(),
   1658                            std::string(), std::string(), ICEMODE_FULL,
   1659                            CONNECTIONROLE_NONE, NULL, candidates)), error);
   1660 }
   1661 
   1662 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1663                                  const TransportInfo& tinfo,
   1664                                  XmlElements* elems, WriteError* error) {
   1665   TransportInfos tinfos;
   1666   tinfos.push_back(tinfo);
   1667   return WriteTransportInfos(protocol, tinfos,
   1668                              GetTransportParsers(), GetCandidateTranslators(),
   1669                              elems, error);
   1670 }
   1671 
   1672 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
   1673   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1674        iter != transport_proxies().end(); ++iter) {
   1675     TransportProxy* transproxy = iter->second;
   1676     if (transproxy->sent_candidates().size() > 0) {
   1677       if (!SendTransportInfoMessage(
   1678               transproxy, transproxy->sent_candidates(), error)) {
   1679         LOG(LS_ERROR) << "Could not resend transport info messages: "
   1680                       << error->text;
   1681         return false;
   1682       }
   1683       transproxy->ClearSentCandidates();
   1684     }
   1685   }
   1686   return true;
   1687 }
   1688 
   1689 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
   1690   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1691        iter != transport_proxies().end(); ++iter) {
   1692     TransportProxy* transproxy = iter->second;
   1693     if (transproxy->unsent_candidates().size() > 0) {
   1694       if (!SendTransportInfoMessage(
   1695               transproxy, transproxy->unsent_candidates(), error)) {
   1696         LOG(LS_ERROR) << "Could not send unsent transport info messages: "
   1697                       << error->text;
   1698         return false;
   1699       }
   1700       transproxy->ClearUnsentCandidates();
   1701     }
   1702   }
   1703   return true;
   1704 }
   1705 
   1706 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1707                           SessionError* error) {
   1708     return SendMessage(type, action_elems, remote_name(), error);
   1709 }
   1710 
   1711 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1712                           const std::string& remote_name, SessionError* error) {
   1713   rtc::scoped_ptr<buzz::XmlElement> stanza(
   1714       new buzz::XmlElement(buzz::QN_IQ));
   1715 
   1716   SessionMessage msg(current_protocol_, type, id(), initiator_name());
   1717   msg.to = remote_name;
   1718   WriteSessionMessage(msg, action_elems, stanza.get());
   1719 
   1720   SignalOutgoingMessage(this, stanza.get());
   1721   return true;
   1722 }
   1723 
   1724 template <typename Action>
   1725 bool Session::SendMessage(ActionType type, const Action& action,
   1726                           SessionError* error) {
   1727   rtc::scoped_ptr<buzz::XmlElement> stanza(
   1728       new buzz::XmlElement(buzz::QN_IQ));
   1729   if (!WriteActionMessage(type, action, stanza.get(), error))
   1730     return false;
   1731 
   1732   SignalOutgoingMessage(this, stanza.get());
   1733   return true;
   1734 }
   1735 
   1736 template <typename Action>
   1737 bool Session::WriteActionMessage(ActionType type, const Action& action,
   1738                                  buzz::XmlElement* stanza,
   1739                                  WriteError* error) {
   1740   if (current_protocol_ == PROTOCOL_HYBRID) {
   1741     if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
   1742       return false;
   1743     if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
   1744       return false;
   1745   } else {
   1746     if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
   1747       return false;
   1748   }
   1749   return true;
   1750 }
   1751 
   1752 template <typename Action>
   1753 bool Session::WriteActionMessage(SignalingProtocol protocol,
   1754                                  ActionType type, const Action& action,
   1755                                  buzz::XmlElement* stanza, WriteError* error) {
   1756   XmlElements action_elems;
   1757   if (!WriteSessionAction(protocol, action, &action_elems, error))
   1758     return false;
   1759 
   1760   SessionMessage msg(protocol, type, id(), initiator_name());
   1761   msg.to = remote_name();
   1762 
   1763   WriteSessionMessage(msg, action_elems, stanza);
   1764   return true;
   1765 }
   1766 
   1767 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
   1768   rtc::scoped_ptr<buzz::XmlElement> ack(
   1769       new buzz::XmlElement(buzz::QN_IQ));
   1770   ack->SetAttr(buzz::QN_TO, remote_name());
   1771   ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
   1772   ack->SetAttr(buzz::QN_TYPE, "result");
   1773 
   1774   SignalOutgoingMessage(this, ack.get());
   1775 }
   1776 
   1777 }  // namespace cricket
   1778