Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/session.h"
     29 
     30 #include "talk/base/bind.h"
     31 #include "talk/base/common.h"
     32 #include "talk/base/logging.h"
     33 #include "talk/base/helpers.h"
     34 #include "talk/base/scoped_ptr.h"
     35 #include "talk/base/sslstreamadapter.h"
     36 #include "talk/xmpp/constants.h"
     37 #include "talk/xmpp/jid.h"
     38 #include "talk/p2p/base/dtlstransport.h"
     39 #include "talk/p2p/base/p2ptransport.h"
     40 #include "talk/p2p/base/sessionclient.h"
     41 #include "talk/p2p/base/transport.h"
     42 #include "talk/p2p/base/transportchannelproxy.h"
     43 #include "talk/p2p/base/transportinfo.h"
     44 
     45 #include "talk/p2p/base/constants.h"
     46 
     47 namespace cricket {
     48 
     49 using talk_base::Bind;
     50 
     51 bool BadMessage(const buzz::QName type,
     52                 const std::string& text,
     53                 MessageError* err) {
     54   err->SetType(type);
     55   err->SetText(text);
     56   return false;
     57 }
     58 
     59 TransportProxy::~TransportProxy() {
     60   for (ChannelMap::iterator iter = channels_.begin();
     61        iter != channels_.end(); ++iter) {
     62     iter->second->SignalDestroyed(iter->second);
     63     delete iter->second;
     64   }
     65 }
     66 
     67 std::string TransportProxy::type() const {
     68   return transport_->get()->type();
     69 }
     70 
     71 TransportChannel* TransportProxy::GetChannel(int component) {
     72   ASSERT(talk_base::Thread::Current() == worker_thread_);
     73   return GetChannelProxy(component);
     74 }
     75 
     76 TransportChannel* TransportProxy::CreateChannel(
     77     const std::string& name, int component) {
     78   ASSERT(talk_base::Thread::Current() == worker_thread_);
     79   ASSERT(GetChannel(component) == NULL);
     80   ASSERT(!transport_->get()->HasChannel(component));
     81 
     82   // We always create a proxy in case we need to change out the transport later.
     83   TransportChannelProxy* channel =
     84       new TransportChannelProxy(content_name(), name, component);
     85   channels_[component] = channel;
     86 
     87   // If we're already negotiated, create an impl and hook it up to the proxy
     88   // channel. If we're connecting, create an impl but don't hook it up yet.
     89   if (negotiated_) {
     90     SetupChannelProxy_w(component, channel);
     91   } else if (connecting_) {
     92     GetOrCreateChannelProxyImpl_w(component);
     93   }
     94   return channel;
     95 }
     96 
     97 bool TransportProxy::HasChannel(int component) {
     98   return transport_->get()->HasChannel(component);
     99 }
    100 
    101 void TransportProxy::DestroyChannel(int component) {
    102   ASSERT(talk_base::Thread::Current() == worker_thread_);
    103   TransportChannel* channel = GetChannel(component);
    104   if (channel) {
    105     // If the state of TransportProxy is not NEGOTIATED
    106     // then TransportChannelProxy and its impl are not
    107     // connected. Both must be connected before
    108     // deletion.
    109     if (!negotiated_) {
    110       SetupChannelProxy_w(component, GetChannelProxy(component));
    111     }
    112 
    113     channels_.erase(component);
    114     channel->SignalDestroyed(channel);
    115     delete channel;
    116   }
    117 }
    118 
    119 void TransportProxy::ConnectChannels() {
    120   if (!connecting_) {
    121     if (!negotiated_) {
    122       for (ChannelMap::iterator iter = channels_.begin();
    123            iter != channels_.end(); ++iter) {
    124         GetOrCreateChannelProxyImpl(iter->first);
    125       }
    126     }
    127     connecting_ = true;
    128   }
    129   // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
    130   // don't have any channels yet, so we need to allow this method to be called
    131   // multiple times. Once we fix Transport, we can move this call inside the
    132   // if (!connecting_) block.
    133   transport_->get()->ConnectChannels();
    134 }
    135 
    136 void TransportProxy::CompleteNegotiation() {
    137   if (!negotiated_) {
    138     for (ChannelMap::iterator iter = channels_.begin();
    139          iter != channels_.end(); ++iter) {
    140       SetupChannelProxy(iter->first, iter->second);
    141     }
    142     negotiated_ = true;
    143   }
    144 }
    145 
    146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
    147   for (Candidates::const_iterator cand = candidates.begin();
    148        cand != candidates.end(); ++cand) {
    149     sent_candidates_.push_back(*cand);
    150   }
    151 }
    152 
    153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
    154   for (Candidates::const_iterator cand = candidates.begin();
    155        cand != candidates.end(); ++cand) {
    156     unsent_candidates_.push_back(*cand);
    157   }
    158 }
    159 
    160 bool TransportProxy::GetChannelNameFromComponent(
    161     int component, std::string* channel_name) const {
    162   const TransportChannelProxy* channel = GetChannelProxy(component);
    163   if (channel == NULL) {
    164     return false;
    165   }
    166 
    167   *channel_name = channel->name();
    168   return true;
    169 }
    170 
    171 bool TransportProxy::GetComponentFromChannelName(
    172     const std::string& channel_name, int* component) const {
    173   const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
    174   if (channel == NULL) {
    175     return false;
    176   }
    177 
    178   *component = channel->component();
    179   return true;
    180 }
    181 
    182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
    183   ChannelMap::const_iterator iter = channels_.find(component);
    184   return (iter != channels_.end()) ? iter->second : NULL;
    185 }
    186 
    187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
    188     const std::string& name) const {
    189   for (ChannelMap::const_iterator iter = channels_.begin();
    190        iter != channels_.end();
    191        ++iter) {
    192     if (iter->second->name() == name) {
    193       return iter->second;
    194     }
    195   }
    196   return NULL;
    197 }
    198 
    199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
    200     int component) {
    201   return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
    202       &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
    203 }
    204 
    205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
    206     int component) {
    207   ASSERT(talk_base::Thread::Current() == worker_thread_);
    208   TransportChannelImpl* impl = transport_->get()->GetChannel(component);
    209   if (impl == NULL) {
    210     impl = transport_->get()->CreateChannel(component);
    211   }
    212   return impl;
    213 }
    214 
    215 void TransportProxy::SetupChannelProxy(
    216     int component, TransportChannelProxy* transproxy) {
    217   worker_thread_->Invoke<void>(Bind(
    218       &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
    219 }
    220 
    221 void TransportProxy::SetupChannelProxy_w(
    222     int component, TransportChannelProxy* transproxy) {
    223   ASSERT(talk_base::Thread::Current() == worker_thread_);
    224   TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
    225   ASSERT(impl != NULL);
    226   transproxy->SetImplementation(impl);
    227 }
    228 
    229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
    230                                              TransportChannelImpl* impl) {
    231   worker_thread_->Invoke<void>(Bind(
    232       &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
    233 }
    234 
    235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
    236                                                TransportChannelImpl* impl) {
    237   ASSERT(talk_base::Thread::Current() == worker_thread_);
    238   ASSERT(proxy != NULL);
    239   proxy->SetImplementation(impl);
    240 }
    241 
    242 // This function muxes |this| onto |target| by repointing |this| at
    243 // |target|'s transport and setting our TransportChannelProxies
    244 // to point to |target|'s underlying implementations.
    245 bool TransportProxy::SetupMux(TransportProxy* target) {
    246   // Bail out if there's nothing to do.
    247   if (transport_ == target->transport_) {
    248     return true;
    249   }
    250 
    251   // Run through all channels and remove any non-rtp transport channels before
    252   // setting target transport channels.
    253   for (ChannelMap::const_iterator iter = channels_.begin();
    254        iter != channels_.end(); ++iter) {
    255     if (!target->transport_->get()->HasChannel(iter->first)) {
    256       // Remove if channel doesn't exist in |transport_|.
    257       ReplaceChannelProxyImpl(iter->second, NULL);
    258     } else {
    259       // Replace the impl for all the TransportProxyChannels with the channels
    260       // from |target|'s transport. Fail if there's not an exact match.
    261       ReplaceChannelProxyImpl(
    262           iter->second, target->transport_->get()->CreateChannel(iter->first));
    263     }
    264   }
    265 
    266   // Now replace our transport. Must happen afterwards because
    267   // it deletes all impls as a side effect.
    268   transport_ = target->transport_;
    269   transport_->get()->SignalCandidatesReady.connect(
    270       this, &TransportProxy::OnTransportCandidatesReady);
    271   set_candidates_allocated(target->candidates_allocated());
    272   return true;
    273 }
    274 
    275 void TransportProxy::SetIceRole(IceRole role) {
    276   transport_->get()->SetIceRole(role);
    277 }
    278 
    279 bool TransportProxy::SetLocalTransportDescription(
    280     const TransportDescription& description,
    281     ContentAction action,
    282     std::string* error_desc) {
    283   // If this is an answer, finalize the negotiation.
    284   if (action == CA_ANSWER) {
    285     CompleteNegotiation();
    286   }
    287   bool result = transport_->get()->SetLocalTransportDescription(description,
    288                                                                 action,
    289                                                                 error_desc);
    290   if (result)
    291     local_description_set_ = true;
    292   return result;
    293 }
    294 
    295 bool TransportProxy::SetRemoteTransportDescription(
    296     const TransportDescription& description,
    297     ContentAction action,
    298     std::string* error_desc) {
    299   // If this is an answer, finalize the negotiation.
    300   if (action == CA_ANSWER) {
    301     CompleteNegotiation();
    302   }
    303   bool result = transport_->get()->SetRemoteTransportDescription(description,
    304                                                                  action,
    305                                                                  error_desc);
    306   if (result)
    307     remote_description_set_ = true;
    308   return result;
    309 }
    310 
    311 void TransportProxy::OnSignalingReady() {
    312   // If we're starting a new allocation sequence, reset our state.
    313   set_candidates_allocated(false);
    314   transport_->get()->OnSignalingReady();
    315 }
    316 
    317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
    318                                         std::string* error) {
    319   // Ensure the transport is negotiated before handling candidates.
    320   // TODO(juberti): Remove this once everybody calls SetLocalTD.
    321   CompleteNegotiation();
    322 
    323   // Verify each candidate before passing down to transport layer.
    324   for (Candidates::const_iterator cand = candidates.begin();
    325        cand != candidates.end(); ++cand) {
    326     if (!transport_->get()->VerifyCandidate(*cand, error))
    327       return false;
    328     if (!HasChannel(cand->component())) {
    329       *error = "Candidate has unknown component: " + cand->ToString() +
    330                " for content: " + content_name_;
    331       return false;
    332     }
    333   }
    334   transport_->get()->OnRemoteCandidates(candidates);
    335   return true;
    336 }
    337 
    338 void TransportProxy::SetIdentity(
    339     talk_base::SSLIdentity* identity) {
    340   transport_->get()->SetIdentity(identity);
    341 }
    342 
    343 std::string BaseSession::StateToString(State state) {
    344   switch (state) {
    345     case Session::STATE_INIT:
    346       return "STATE_INIT";
    347     case Session::STATE_SENTINITIATE:
    348       return "STATE_SENTINITIATE";
    349     case Session::STATE_RECEIVEDINITIATE:
    350       return "STATE_RECEIVEDINITIATE";
    351     case Session::STATE_SENTPRACCEPT:
    352       return "STATE_SENTPRACCEPT";
    353     case Session::STATE_SENTACCEPT:
    354       return "STATE_SENTACCEPT";
    355     case Session::STATE_RECEIVEDPRACCEPT:
    356       return "STATE_RECEIVEDPRACCEPT";
    357     case Session::STATE_RECEIVEDACCEPT:
    358       return "STATE_RECEIVEDACCEPT";
    359     case Session::STATE_SENTMODIFY:
    360       return "STATE_SENTMODIFY";
    361     case Session::STATE_RECEIVEDMODIFY:
    362       return "STATE_RECEIVEDMODIFY";
    363     case Session::STATE_SENTREJECT:
    364       return "STATE_SENTREJECT";
    365     case Session::STATE_RECEIVEDREJECT:
    366       return "STATE_RECEIVEDREJECT";
    367     case Session::STATE_SENTREDIRECT:
    368       return "STATE_SENTREDIRECT";
    369     case Session::STATE_SENTTERMINATE:
    370       return "STATE_SENTTERMINATE";
    371     case Session::STATE_RECEIVEDTERMINATE:
    372       return "STATE_RECEIVEDTERMINATE";
    373     case Session::STATE_INPROGRESS:
    374       return "STATE_INPROGRESS";
    375     case Session::STATE_DEINIT:
    376       return "STATE_DEINIT";
    377     default:
    378       break;
    379   }
    380   return "STATE_" + talk_base::ToString(state);
    381 }
    382 
    383 BaseSession::BaseSession(talk_base::Thread* signaling_thread,
    384                          talk_base::Thread* worker_thread,
    385                          PortAllocator* port_allocator,
    386                          const std::string& sid,
    387                          const std::string& content_type,
    388                          bool initiator)
    389     : state_(STATE_INIT),
    390       error_(ERROR_NONE),
    391       signaling_thread_(signaling_thread),
    392       worker_thread_(worker_thread),
    393       port_allocator_(port_allocator),
    394       sid_(sid),
    395       content_type_(content_type),
    396       transport_type_(NS_GINGLE_P2P),
    397       initiator_(initiator),
    398       identity_(NULL),
    399       local_description_(NULL),
    400       remote_description_(NULL),
    401       ice_tiebreaker_(talk_base::CreateRandomId64()),
    402       role_switch_(false) {
    403   ASSERT(signaling_thread->IsCurrent());
    404 }
    405 
    406 BaseSession::~BaseSession() {
    407   ASSERT(signaling_thread()->IsCurrent());
    408 
    409   ASSERT(state_ != STATE_DEINIT);
    410   LogState(state_, STATE_DEINIT);
    411   state_ = STATE_DEINIT;
    412   SignalState(this, state_);
    413 
    414   for (TransportMap::iterator iter = transports_.begin();
    415        iter != transports_.end(); ++iter) {
    416     delete iter->second;
    417   }
    418 
    419   delete remote_description_;
    420   delete local_description_;
    421 }
    422 
    423 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) {
    424   if (identity_)
    425     return false;
    426   identity_ = identity;
    427   for (TransportMap::iterator iter = transports_.begin();
    428        iter != transports_.end(); ++iter) {
    429     iter->second->SetIdentity(identity_);
    430   }
    431   return true;
    432 }
    433 
    434 bool BaseSession::PushdownTransportDescription(ContentSource source,
    435                                                ContentAction action,
    436                                                std::string* error_desc) {
    437   if (source == CS_LOCAL) {
    438     return PushdownLocalTransportDescription(local_description_,
    439                                              action,
    440                                              error_desc);
    441   }
    442   return PushdownRemoteTransportDescription(remote_description_,
    443                                             action,
    444                                             error_desc);
    445 }
    446 
    447 bool BaseSession::PushdownLocalTransportDescription(
    448     const SessionDescription* sdesc,
    449     ContentAction action,
    450     std::string* error_desc) {
    451   // Update the Transports with the right information, and trigger them to
    452   // start connecting.
    453   for (TransportMap::iterator iter = transports_.begin();
    454        iter != transports_.end(); ++iter) {
    455     // If no transport info was in this session description, ret == false
    456     // and we just skip this one.
    457     TransportDescription tdesc;
    458     bool ret = GetTransportDescription(
    459         sdesc, iter->second->content_name(), &tdesc);
    460     if (ret) {
    461       if (!iter->second->SetLocalTransportDescription(tdesc, action,
    462                                                       error_desc)) {
    463         return false;
    464       }
    465 
    466       iter->second->ConnectChannels();
    467     }
    468   }
    469 
    470   return true;
    471 }
    472 
    473 bool BaseSession::PushdownRemoteTransportDescription(
    474     const SessionDescription* sdesc,
    475     ContentAction action,
    476     std::string* error_desc) {
    477   // Update the Transports with the right information.
    478   for (TransportMap::iterator iter = transports_.begin();
    479        iter != transports_.end(); ++iter) {
    480     TransportDescription tdesc;
    481 
    482     // If no transport info was in this session description, ret == false
    483     // and we just skip this one.
    484     bool ret = GetTransportDescription(
    485         sdesc, iter->second->content_name(), &tdesc);
    486     if (ret) {
    487       if (!iter->second->SetRemoteTransportDescription(tdesc, action,
    488                                                        error_desc)) {
    489         return false;
    490       }
    491     }
    492   }
    493 
    494   return true;
    495 }
    496 
    497 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
    498                                              const std::string& channel_name,
    499                                              int component) {
    500   // We create the proxy "on demand" here because we need to support
    501   // creating channels at any time, even before we send or receive
    502   // initiate messages, which is before we create the transports.
    503   TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
    504   return transproxy->CreateChannel(channel_name, component);
    505 }
    506 
    507 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
    508                                           int component) {
    509   TransportProxy* transproxy = GetTransportProxy(content_name);
    510   if (transproxy == NULL)
    511     return NULL;
    512   else
    513     return transproxy->GetChannel(component);
    514 }
    515 
    516 void BaseSession::DestroyChannel(const std::string& content_name,
    517                                  int component) {
    518   TransportProxy* transproxy = GetTransportProxy(content_name);
    519   ASSERT(transproxy != NULL);
    520   transproxy->DestroyChannel(component);
    521 }
    522 
    523 TransportProxy* BaseSession::GetOrCreateTransportProxy(
    524     const std::string& content_name) {
    525   TransportProxy* transproxy = GetTransportProxy(content_name);
    526   if (transproxy)
    527     return transproxy;
    528 
    529   Transport* transport = CreateTransport(content_name);
    530   transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
    531   transport->SetIceTiebreaker(ice_tiebreaker_);
    532   // TODO: Connect all the Transport signals to TransportProxy
    533   // then to the BaseSession.
    534   transport->SignalConnecting.connect(
    535       this, &BaseSession::OnTransportConnecting);
    536   transport->SignalWritableState.connect(
    537       this, &BaseSession::OnTransportWritable);
    538   transport->SignalRequestSignaling.connect(
    539       this, &BaseSession::OnTransportRequestSignaling);
    540   transport->SignalTransportError.connect(
    541       this, &BaseSession::OnTransportSendError);
    542   transport->SignalRouteChange.connect(
    543       this, &BaseSession::OnTransportRouteChange);
    544   transport->SignalCandidatesAllocationDone.connect(
    545       this, &BaseSession::OnTransportCandidatesAllocationDone);
    546   transport->SignalRoleConflict.connect(
    547       this, &BaseSession::OnRoleConflict);
    548   transport->SignalCompleted.connect(
    549       this, &BaseSession::OnTransportCompleted);
    550   transport->SignalFailed.connect(
    551       this, &BaseSession::OnTransportFailed);
    552 
    553   transproxy = new TransportProxy(worker_thread_, sid_, content_name,
    554                                   new TransportWrapper(transport));
    555   transproxy->SignalCandidatesReady.connect(
    556       this, &BaseSession::OnTransportProxyCandidatesReady);
    557   if (identity_)
    558     transproxy->SetIdentity(identity_);
    559   transports_[content_name] = transproxy;
    560 
    561   return transproxy;
    562 }
    563 
    564 Transport* BaseSession::GetTransport(const std::string& content_name) {
    565   TransportProxy* transproxy = GetTransportProxy(content_name);
    566   if (transproxy == NULL)
    567     return NULL;
    568   return transproxy->impl();
    569 }
    570 
    571 TransportProxy* BaseSession::GetTransportProxy(
    572     const std::string& content_name) {
    573   TransportMap::iterator iter = transports_.find(content_name);
    574   return (iter != transports_.end()) ? iter->second : NULL;
    575 }
    576 
    577 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
    578   for (TransportMap::iterator iter = transports_.begin();
    579        iter != transports_.end(); ++iter) {
    580     TransportProxy* transproxy = iter->second;
    581     if (transproxy->impl() == transport) {
    582       return transproxy;
    583     }
    584   }
    585   return NULL;
    586 }
    587 
    588 TransportProxy* BaseSession::GetFirstTransportProxy() {
    589   if (transports_.empty())
    590     return NULL;
    591   return transports_.begin()->second;
    592 }
    593 
    594 void BaseSession::DestroyTransportProxy(
    595     const std::string& content_name) {
    596   TransportMap::iterator iter = transports_.find(content_name);
    597   if (iter != transports_.end()) {
    598     delete iter->second;
    599     transports_.erase(content_name);
    600   }
    601 }
    602 
    603 cricket::Transport* BaseSession::CreateTransport(
    604     const std::string& content_name) {
    605   ASSERT(transport_type_ == NS_GINGLE_P2P);
    606   return new cricket::DtlsTransport<P2PTransport>(
    607       signaling_thread(), worker_thread(), content_name,
    608       port_allocator(), identity_);
    609 }
    610 
    611 bool BaseSession::GetStats(SessionStats* stats) {
    612   for (TransportMap::iterator iter = transports_.begin();
    613        iter != transports_.end(); ++iter) {
    614     std::string proxy_id = iter->second->content_name();
    615     // We are ignoring not-yet-instantiated transports.
    616     if (iter->second->impl()) {
    617       std::string transport_id = iter->second->impl()->content_name();
    618       stats->proxy_to_transport[proxy_id] = transport_id;
    619       if (stats->transport_stats.find(transport_id)
    620           == stats->transport_stats.end()) {
    621         TransportStats subinfos;
    622         if (!iter->second->impl()->GetStats(&subinfos)) {
    623           return false;
    624         }
    625         stats->transport_stats[transport_id] = subinfos;
    626       }
    627     }
    628   }
    629   return true;
    630 }
    631 
    632 void BaseSession::SetState(State state) {
    633   ASSERT(signaling_thread_->IsCurrent());
    634   if (state != state_) {
    635     LogState(state_, state);
    636     state_ = state;
    637     SignalState(this, state_);
    638     signaling_thread_->Post(this, MSG_STATE);
    639   }
    640   SignalNewDescription();
    641 }
    642 
    643 void BaseSession::SetError(Error error, const std::string& error_desc) {
    644   ASSERT(signaling_thread_->IsCurrent());
    645   if (error != error_) {
    646     error_ = error;
    647     error_desc_ = error_desc;
    648     SignalError(this, error);
    649   }
    650 }
    651 
    652 void BaseSession::OnSignalingReady() {
    653   ASSERT(signaling_thread()->IsCurrent());
    654   for (TransportMap::iterator iter = transports_.begin();
    655        iter != transports_.end(); ++iter) {
    656     iter->second->OnSignalingReady();
    657   }
    658 }
    659 
    660 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
    661 // start, remove this method once everyone calls PushdownLocalTD.
    662 void BaseSession::SpeculativelyConnectAllTransportChannels() {
    663   // Put all transports into the connecting state.
    664   for (TransportMap::iterator iter = transports_.begin();
    665        iter != transports_.end(); ++iter) {
    666     iter->second->ConnectChannels();
    667   }
    668 }
    669 
    670 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
    671                                      const Candidates& candidates,
    672                                      std::string* error) {
    673   // Give candidates to the appropriate transport, and tell that transport
    674   // to start connecting, if it's not already doing so.
    675   TransportProxy* transproxy = GetTransportProxy(content_name);
    676   if (!transproxy) {
    677     *error = "Unknown content name " + content_name;
    678     return false;
    679   }
    680   if (!transproxy->OnRemoteCandidates(candidates, error)) {
    681     return false;
    682   }
    683   // TODO(juberti): Remove this call once we can be sure that we always have
    684   // a local transport description (which will trigger the connection).
    685   transproxy->ConnectChannels();
    686   return true;
    687 }
    688 
    689 bool BaseSession::MaybeEnableMuxingSupport() {
    690   // We need both a local and remote description to decide if we should mux.
    691   if ((state_ == STATE_SENTINITIATE ||
    692       state_ == STATE_RECEIVEDINITIATE) &&
    693       ((local_description_ == NULL) ||
    694       (remote_description_ == NULL))) {
    695     return false;
    696   }
    697 
    698   // In order to perform the multiplexing, we need all proxies to be in the
    699   // negotiated state, i.e. to have implementations underneath.
    700   // Ensure that this is the case, regardless of whether we are going to mux.
    701   for (TransportMap::iterator iter = transports_.begin();
    702        iter != transports_.end(); ++iter) {
    703     ASSERT(iter->second->negotiated());
    704     if (!iter->second->negotiated())
    705       return false;
    706   }
    707 
    708   // If both sides agree to BUNDLE, mux all the specified contents onto the
    709   // transport belonging to the first content name in the BUNDLE group.
    710   // If the contents are already muxed, this will be a no-op.
    711   // TODO(juberti): Should this check that local and remote have configured
    712   // BUNDLE the same way?
    713   bool candidates_allocated = IsCandidateAllocationDone();
    714   const ContentGroup* local_bundle_group =
    715       local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    716   const ContentGroup* remote_bundle_group =
    717       remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    718   if (local_bundle_group && remote_bundle_group &&
    719       local_bundle_group->FirstContentName()) {
    720     const std::string* content_name = local_bundle_group->FirstContentName();
    721     const ContentInfo* content =
    722         local_description_->GetContentByName(*content_name);
    723     ASSERT(content != NULL);
    724     if (!SetSelectedProxy(content->name, local_bundle_group)) {
    725       LOG(LS_WARNING) << "Failed to set up BUNDLE";
    726       return false;
    727     }
    728 
    729     // If we weren't done gathering before, we might be done now, as a result
    730     // of enabling mux.
    731     LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
    732                  << *content_name;
    733     if (!candidates_allocated) {
    734       MaybeCandidateAllocationDone();
    735     }
    736   } else {
    737     LOG(LS_INFO) << "No BUNDLE information, not bundling.";
    738   }
    739   return true;
    740 }
    741 
    742 bool BaseSession::SetSelectedProxy(const std::string& content_name,
    743                                    const ContentGroup* muxed_group) {
    744   TransportProxy* selected_proxy = GetTransportProxy(content_name);
    745   if (!selected_proxy) {
    746     return false;
    747   }
    748 
    749   ASSERT(selected_proxy->negotiated());
    750   for (TransportMap::iterator iter = transports_.begin();
    751        iter != transports_.end(); ++iter) {
    752     // If content is part of the mux group, then repoint its proxy at the
    753     // transport object that we have chosen to mux onto. If the proxy
    754     // is already pointing at the right object, it will be a no-op.
    755     if (muxed_group->HasContentName(iter->first) &&
    756         !iter->second->SetupMux(selected_proxy)) {
    757       return false;
    758     }
    759   }
    760   return true;
    761 }
    762 
    763 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
    764   // TODO(juberti): This is a clunky way of processing the done signal. Instead,
    765   // TransportProxy should receive the done signal directly, set its allocated
    766   // flag internally, and then reissue the done signal to Session.
    767   // Overall we should make TransportProxy receive *all* the signals from
    768   // Transport, since this removes the need to manually iterate over all
    769   // the transports, as is needed to make sure signals are handled properly
    770   // when BUNDLEing.
    771   // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
    772   // that make it prohibitively difficult to run dbg builds. Disabled for now.
    773   //ASSERT(!IsCandidateAllocationDone());
    774   for (TransportMap::iterator iter = transports_.begin();
    775        iter != transports_.end(); ++iter) {
    776     if (iter->second->impl() == transport) {
    777       iter->second->set_candidates_allocated(true);
    778     }
    779   }
    780   MaybeCandidateAllocationDone();
    781 }
    782 
    783 bool BaseSession::IsCandidateAllocationDone() const {
    784   for (TransportMap::const_iterator iter = transports_.begin();
    785        iter != transports_.end(); ++iter) {
    786     if (!iter->second->candidates_allocated())
    787       return false;
    788   }
    789   return true;
    790 }
    791 
    792 void BaseSession::MaybeCandidateAllocationDone() {
    793   if (IsCandidateAllocationDone()) {
    794     LOG(LS_INFO) << "Candidate gathering is complete.";
    795     OnCandidatesAllocationDone();
    796   }
    797 }
    798 
    799 void BaseSession::OnRoleConflict() {
    800   if (role_switch_) {
    801     LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
    802     return;
    803   }
    804 
    805   role_switch_ = true;
    806   for (TransportMap::iterator iter = transports_.begin();
    807        iter != transports_.end(); ++iter) {
    808     // Role will be reverse of initial role setting.
    809     IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
    810     iter->second->SetIceRole(role);
    811   }
    812 }
    813 
    814 void BaseSession::LogState(State old_state, State new_state) {
    815   LOG(LS_INFO) << "Session:" << id()
    816                << " Old state:" << StateToString(old_state)
    817                << " New state:" << StateToString(new_state)
    818                << " Type:" << content_type()
    819                << " Transport:" << transport_type();
    820 }
    821 
    822 bool BaseSession::GetTransportDescription(const SessionDescription* description,
    823                                           const std::string& content_name,
    824                                           TransportDescription* tdesc) {
    825   if (!description || !tdesc) {
    826     return false;
    827   }
    828   const TransportInfo* transport_info =
    829       description->GetTransportInfoByName(content_name);
    830   if (!transport_info) {
    831     return false;
    832   }
    833   *tdesc = transport_info->description;
    834   return true;
    835 }
    836 
    837 void BaseSession::SignalNewDescription() {
    838   ContentAction action;
    839   ContentSource source;
    840   if (!GetContentAction(&action, &source)) {
    841     return;
    842   }
    843   if (source == CS_LOCAL) {
    844     SignalNewLocalDescription(this, action);
    845   } else {
    846     SignalNewRemoteDescription(this, action);
    847   }
    848 }
    849 
    850 bool BaseSession::GetContentAction(ContentAction* action,
    851                                    ContentSource* source) {
    852   switch (state_) {
    853     // new local description
    854     case STATE_SENTINITIATE:
    855       *action = CA_OFFER;
    856       *source = CS_LOCAL;
    857       break;
    858     case STATE_SENTPRACCEPT:
    859       *action = CA_PRANSWER;
    860       *source = CS_LOCAL;
    861       break;
    862     case STATE_SENTACCEPT:
    863       *action = CA_ANSWER;
    864       *source = CS_LOCAL;
    865       break;
    866     // new remote description
    867     case STATE_RECEIVEDINITIATE:
    868       *action = CA_OFFER;
    869       *source = CS_REMOTE;
    870       break;
    871     case STATE_RECEIVEDPRACCEPT:
    872       *action = CA_PRANSWER;
    873       *source = CS_REMOTE;
    874       break;
    875     case STATE_RECEIVEDACCEPT:
    876       *action = CA_ANSWER;
    877       *source = CS_REMOTE;
    878       break;
    879     default:
    880       return false;
    881   }
    882   return true;
    883 }
    884 
    885 void BaseSession::OnMessage(talk_base::Message *pmsg) {
    886   switch (pmsg->message_id) {
    887   case MSG_TIMEOUT:
    888     // Session timeout has occured.
    889     SetError(ERROR_TIME, "Session timeout has occured.");
    890     break;
    891 
    892   case MSG_STATE:
    893     switch (state_) {
    894     case STATE_SENTACCEPT:
    895     case STATE_RECEIVEDACCEPT:
    896       SetState(STATE_INPROGRESS);
    897       break;
    898 
    899     default:
    900       // Explicitly ignoring some states here.
    901       break;
    902     }
    903     break;
    904   }
    905 }
    906 
    907 Session::Session(SessionManager* session_manager,
    908                  const std::string& local_name,
    909                  const std::string& initiator_name,
    910                  const std::string& sid,
    911                  const std::string& content_type,
    912                  SessionClient* client)
    913     : BaseSession(session_manager->signaling_thread(),
    914                   session_manager->worker_thread(),
    915                   session_manager->port_allocator(),
    916                   sid, content_type, initiator_name == local_name) {
    917   ASSERT(client != NULL);
    918   session_manager_ = session_manager;
    919   local_name_ = local_name;
    920   initiator_name_ = initiator_name;
    921   transport_parser_ = new P2PTransportParser();
    922   client_ = client;
    923   initiate_acked_ = false;
    924   current_protocol_ = PROTOCOL_HYBRID;
    925 }
    926 
    927 Session::~Session() {
    928   delete transport_parser_;
    929 }
    930 
    931 bool Session::Initiate(const std::string &to,
    932                        const SessionDescription* sdesc) {
    933   ASSERT(signaling_thread()->IsCurrent());
    934   SessionError error;
    935 
    936   // Only from STATE_INIT
    937   if (state() != STATE_INIT)
    938     return false;
    939 
    940   // Setup for signaling.
    941   set_remote_name(to);
    942   set_local_description(sdesc);
    943   if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
    944                               &error)) {
    945     LOG(LS_ERROR) << "Could not create transports: " << error.text;
    946     return false;
    947   }
    948 
    949   if (!SendInitiateMessage(sdesc, &error)) {
    950     LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
    951     return false;
    952   }
    953 
    954   // We need to connect transport proxy and impl here so that we can process
    955   // the TransportDescriptions.
    956   SpeculativelyConnectAllTransportChannels();
    957 
    958   PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
    959   SetState(Session::STATE_SENTINITIATE);
    960   return true;
    961 }
    962 
    963 bool Session::Accept(const SessionDescription* sdesc) {
    964   ASSERT(signaling_thread()->IsCurrent());
    965 
    966   // Only if just received initiate
    967   if (state() != STATE_RECEIVEDINITIATE)
    968     return false;
    969 
    970   // Setup for signaling.
    971   set_local_description(sdesc);
    972 
    973   SessionError error;
    974   if (!SendAcceptMessage(sdesc, &error)) {
    975     LOG(LS_ERROR) << "Could not send accept message: " << error.text;
    976     return false;
    977   }
    978   // TODO(juberti): Add BUNDLE support to transport-info messages.
    979   PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
    980   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
    981   SetState(Session::STATE_SENTACCEPT);
    982   return true;
    983 }
    984 
    985 bool Session::Reject(const std::string& reason) {
    986   ASSERT(signaling_thread()->IsCurrent());
    987 
    988   // Reject is sent in response to an initiate or modify, to reject the
    989   // request
    990   if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
    991     return false;
    992 
    993   SessionError error;
    994   if (!SendRejectMessage(reason, &error)) {
    995     LOG(LS_ERROR) << "Could not send reject message: " << error.text;
    996     return false;
    997   }
    998 
    999   SetState(STATE_SENTREJECT);
   1000   return true;
   1001 }
   1002 
   1003 bool Session::TerminateWithReason(const std::string& reason) {
   1004   ASSERT(signaling_thread()->IsCurrent());
   1005 
   1006   // Either side can terminate, at any time.
   1007   switch (state()) {
   1008     case STATE_SENTTERMINATE:
   1009     case STATE_RECEIVEDTERMINATE:
   1010       return false;
   1011 
   1012     case STATE_SENTREJECT:
   1013     case STATE_RECEIVEDREJECT:
   1014       // We don't need to send terminate if we sent or received a reject...
   1015       // it's implicit.
   1016       break;
   1017 
   1018     default:
   1019       SessionError error;
   1020       if (!SendTerminateMessage(reason, &error)) {
   1021         LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
   1022         return false;
   1023       }
   1024       break;
   1025   }
   1026 
   1027   SetState(STATE_SENTTERMINATE);
   1028   return true;
   1029 }
   1030 
   1031 bool Session::SendInfoMessage(const XmlElements& elems,
   1032                               const std::string& remote_name) {
   1033   ASSERT(signaling_thread()->IsCurrent());
   1034   SessionError error;
   1035   if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
   1036     LOG(LS_ERROR) << "Could not send info message " << error.text;
   1037     return false;
   1038   }
   1039   return true;
   1040 }
   1041 
   1042 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
   1043   XmlElements elems;
   1044   WriteError write_error;
   1045   if (!WriteDescriptionInfo(current_protocol_,
   1046                             contents,
   1047                             GetContentParsers(),
   1048                             &elems, &write_error)) {
   1049     LOG(LS_ERROR) << "Could not write description info message: "
   1050                   << write_error.text;
   1051     return false;
   1052   }
   1053   SessionError error;
   1054   if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
   1055     LOG(LS_ERROR) << "Could not send description info message: "
   1056                   << error.text;
   1057     return false;
   1058   }
   1059   return true;
   1060 }
   1061 
   1062 TransportInfos Session::GetEmptyTransportInfos(
   1063     const ContentInfos& contents) const {
   1064   TransportInfos tinfos;
   1065   for (ContentInfos::const_iterator content = contents.begin();
   1066        content != contents.end(); ++content) {
   1067     tinfos.push_back(TransportInfo(content->name,
   1068                                    TransportDescription(transport_type(),
   1069                                                         std::string(),
   1070                                                         std::string())));
   1071   }
   1072   return tinfos;
   1073 }
   1074 
   1075 bool Session::OnRemoteCandidates(
   1076     const TransportInfos& tinfos, ParseError* error) {
   1077   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1078        tinfo != tinfos.end(); ++tinfo) {
   1079     std::string str_error;
   1080     if (!BaseSession::OnRemoteCandidates(
   1081         tinfo->content_name, tinfo->description.candidates, &str_error)) {
   1082       return BadParse(str_error, error);
   1083     }
   1084   }
   1085   return true;
   1086 }
   1087 
   1088 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
   1089                                      SessionError* error) {
   1090   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1091        tinfo != tinfos.end(); ++tinfo) {
   1092     if (tinfo->description.transport_type != transport_type()) {
   1093       error->SetText("No supported transport in offer.");
   1094       return false;
   1095     }
   1096 
   1097     GetOrCreateTransportProxy(tinfo->content_name);
   1098   }
   1099   return true;
   1100 }
   1101 
   1102 TransportParserMap Session::GetTransportParsers() {
   1103   TransportParserMap parsers;
   1104   parsers[transport_type()] = transport_parser_;
   1105   return parsers;
   1106 }
   1107 
   1108 CandidateTranslatorMap Session::GetCandidateTranslators() {
   1109   CandidateTranslatorMap translators;
   1110   // NOTE: This technique makes it impossible to parse G-ICE
   1111   // candidates in session-initiate messages because the channels
   1112   // aren't yet created at that point.  Since we don't use candidates
   1113   // in session-initiate messages, we should be OK.  Once we switch to
   1114   // ICE, this translation shouldn't be necessary.
   1115   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1116        iter != transport_proxies().end(); ++iter) {
   1117     translators[iter->first] = iter->second;
   1118   }
   1119   return translators;
   1120 }
   1121 
   1122 ContentParserMap Session::GetContentParsers() {
   1123   ContentParserMap parsers;
   1124   parsers[content_type()] = client_;
   1125   // We need to be able parse both RTP-based and SCTP-based Jingle
   1126   // with the same client.
   1127   if (content_type() == NS_JINGLE_RTP) {
   1128     parsers[NS_JINGLE_DRAFT_SCTP] = client_;
   1129   }
   1130   return parsers;
   1131 }
   1132 
   1133 void Session::OnTransportRequestSignaling(Transport* transport) {
   1134   ASSERT(signaling_thread()->IsCurrent());
   1135   TransportProxy* transproxy = GetTransportProxy(transport);
   1136   ASSERT(transproxy != NULL);
   1137   if (transproxy) {
   1138     // Reset candidate allocation status for the transport proxy.
   1139     transproxy->set_candidates_allocated(false);
   1140   }
   1141   SignalRequestSignaling(this);
   1142 }
   1143 
   1144 void Session::OnTransportConnecting(Transport* transport) {
   1145   // This is an indication that we should begin watching the writability
   1146   // state of the transport.
   1147   OnTransportWritable(transport);
   1148 }
   1149 
   1150 void Session::OnTransportWritable(Transport* transport) {
   1151   ASSERT(signaling_thread()->IsCurrent());
   1152 
   1153   // If the transport is not writable, start a timer to make sure that it
   1154   // becomes writable within a reasonable amount of time.  If it does not, we
   1155   // terminate since we can't actually send data.  If the transport is writable,
   1156   // cancel the timer.  Note that writability transitions may occur repeatedly
   1157   // during the lifetime of the session.
   1158   signaling_thread()->Clear(this, MSG_TIMEOUT);
   1159   if (transport->HasChannels() && !transport->writable()) {
   1160     signaling_thread()->PostDelayed(
   1161         session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
   1162   }
   1163 }
   1164 
   1165 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
   1166                                               const Candidates& candidates) {
   1167   ASSERT(signaling_thread()->IsCurrent());
   1168   if (transproxy != NULL) {
   1169     if (initiator() && !initiate_acked_) {
   1170       // TODO: This is to work around server re-ordering
   1171       // messages.  We send the candidates once the session-initiate
   1172       // is acked.  Once we have fixed the server to guarantee message
   1173       // order, we can remove this case.
   1174       transproxy->AddUnsentCandidates(candidates);
   1175     } else {
   1176       if (!transproxy->negotiated()) {
   1177         transproxy->AddSentCandidates(candidates);
   1178       }
   1179       SessionError error;
   1180       if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
   1181         LOG(LS_ERROR) << "Could not send transport info message: "
   1182                       << error.text;
   1183         return;
   1184       }
   1185     }
   1186   }
   1187 }
   1188 
   1189 void Session::OnTransportSendError(Transport* transport,
   1190                                    const buzz::XmlElement* stanza,
   1191                                    const buzz::QName& name,
   1192                                    const std::string& type,
   1193                                    const std::string& text,
   1194                                    const buzz::XmlElement* extra_info) {
   1195   ASSERT(signaling_thread()->IsCurrent());
   1196   SignalErrorMessage(this, stanza, name, type, text, extra_info);
   1197 }
   1198 
   1199 void Session::OnIncomingMessage(const SessionMessage& msg) {
   1200   ASSERT(signaling_thread()->IsCurrent());
   1201   ASSERT(state() == STATE_INIT || msg.from == remote_name());
   1202 
   1203   if (current_protocol_== PROTOCOL_HYBRID) {
   1204     if (msg.protocol == PROTOCOL_GINGLE) {
   1205       current_protocol_ = PROTOCOL_GINGLE;
   1206     } else {
   1207       current_protocol_ = PROTOCOL_JINGLE;
   1208     }
   1209   }
   1210 
   1211   bool valid = false;
   1212   MessageError error;
   1213   switch (msg.type) {
   1214     case ACTION_SESSION_INITIATE:
   1215       valid = OnInitiateMessage(msg, &error);
   1216       break;
   1217     case ACTION_SESSION_INFO:
   1218       valid = OnInfoMessage(msg);
   1219       break;
   1220     case ACTION_SESSION_ACCEPT:
   1221       valid = OnAcceptMessage(msg, &error);
   1222       break;
   1223     case ACTION_SESSION_REJECT:
   1224       valid = OnRejectMessage(msg, &error);
   1225       break;
   1226     case ACTION_SESSION_TERMINATE:
   1227       valid = OnTerminateMessage(msg, &error);
   1228       break;
   1229     case ACTION_TRANSPORT_INFO:
   1230       valid = OnTransportInfoMessage(msg, &error);
   1231       break;
   1232     case ACTION_TRANSPORT_ACCEPT:
   1233       valid = OnTransportAcceptMessage(msg, &error);
   1234       break;
   1235     case ACTION_DESCRIPTION_INFO:
   1236       valid = OnDescriptionInfoMessage(msg, &error);
   1237       break;
   1238     default:
   1239       valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
   1240                          "unknown session message type",
   1241                          &error);
   1242   }
   1243 
   1244   if (valid) {
   1245     SendAcknowledgementMessage(msg.stanza);
   1246   } else {
   1247     SignalErrorMessage(this, msg.stanza, error.type,
   1248                        "modify", error.text, NULL);
   1249   }
   1250 }
   1251 
   1252 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
   1253                                  const buzz::XmlElement* response_stanza,
   1254                                  const SessionMessage& msg) {
   1255   ASSERT(signaling_thread()->IsCurrent());
   1256 
   1257   if (msg.type == ACTION_SESSION_INITIATE) {
   1258     OnInitiateAcked();
   1259   }
   1260 }
   1261 
   1262 void Session::OnInitiateAcked() {
   1263     // TODO: This is to work around server re-ordering
   1264     // messages.  We send the candidates once the session-initiate
   1265     // is acked.  Once we have fixed the server to guarantee message
   1266     // order, we can remove this case.
   1267   if (!initiate_acked_) {
   1268     initiate_acked_ = true;
   1269     SessionError error;
   1270     SendAllUnsentTransportInfoMessages(&error);
   1271   }
   1272 }
   1273 
   1274 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
   1275                            const buzz::XmlElement* error_stanza) {
   1276   ASSERT(signaling_thread()->IsCurrent());
   1277 
   1278   SessionMessage msg;
   1279   ParseError parse_error;
   1280   if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
   1281     LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
   1282                   << ":" << orig_stanza;
   1283     return;
   1284   }
   1285 
   1286   // If the error is a session redirect, call OnRedirectError, which will
   1287   // continue the session with a new remote JID.
   1288   SessionRedirect redirect;
   1289   if (FindSessionRedirect(error_stanza, &redirect)) {
   1290     SessionError error;
   1291     if (!OnRedirectError(redirect, &error)) {
   1292       // TODO: Should we send a message back?  The standard
   1293       // says nothing about it.
   1294       std::ostringstream desc;
   1295       desc << "Failed to redirect: " << error.text;
   1296       LOG(LS_ERROR) << desc.str();
   1297       SetError(ERROR_RESPONSE, desc.str());
   1298     }
   1299     return;
   1300   }
   1301 
   1302   std::string error_type = "cancel";
   1303 
   1304   const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
   1305   if (error) {
   1306     error_type = error->Attr(buzz::QN_TYPE);
   1307 
   1308     LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
   1309                   << "in response to:\n" << orig_stanza->Str();
   1310   } else {
   1311     // don't crash if <error> is missing
   1312     LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
   1313     return;
   1314   }
   1315 
   1316   if (msg.type == ACTION_TRANSPORT_INFO) {
   1317     // Transport messages frequently generate errors because they are sent right
   1318     // when we detect a network failure.  For that reason, we ignore such
   1319     // errors, because if we do not establish writability again, we will
   1320     // terminate anyway.  The exceptions are transport-specific error tags,
   1321     // which we pass on to the respective transport.
   1322   } else if ((error_type != "continue") && (error_type != "wait")) {
   1323     // We do not set an error if the other side said it is okay to continue
   1324     // (possibly after waiting).  These errors can be ignored.
   1325     SetError(ERROR_RESPONSE, "");
   1326   }
   1327 }
   1328 
   1329 bool Session::OnInitiateMessage(const SessionMessage& msg,
   1330                                 MessageError* error) {
   1331   if (!CheckState(STATE_INIT, error))
   1332     return false;
   1333 
   1334   SessionInitiate init;
   1335   if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
   1336                             GetContentParsers(), GetTransportParsers(),
   1337                             GetCandidateTranslators(),
   1338                             &init, error))
   1339     return false;
   1340 
   1341   SessionError session_error;
   1342   if (!CreateTransportProxies(init.transports, &session_error)) {
   1343     return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
   1344                       session_error.text, error);
   1345   }
   1346 
   1347   set_remote_name(msg.from);
   1348   set_initiator_name(msg.initiator);
   1349   set_remote_description(new SessionDescription(init.ClearContents(),
   1350                                                 init.transports,
   1351                                                 init.groups));
   1352   // Updating transport with TransportDescription.
   1353   PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
   1354   SetState(STATE_RECEIVEDINITIATE);
   1355 
   1356   // Users of Session may listen to state change and call Reject().
   1357   if (state() != STATE_SENTREJECT) {
   1358     if (!OnRemoteCandidates(init.transports, error))
   1359       return false;
   1360 
   1361     // TODO(juberti): Auto-generate and push down the local transport answer.
   1362     // This is necessary for trickling to work with RFC 5245 ICE.
   1363   }
   1364   return true;
   1365 }
   1366 
   1367 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
   1368   if (!CheckState(STATE_SENTINITIATE, error))
   1369     return false;
   1370 
   1371   SessionAccept accept;
   1372   if (!ParseSessionAccept(msg.protocol, msg.action_elem,
   1373                           GetContentParsers(), GetTransportParsers(),
   1374                           GetCandidateTranslators(),
   1375                           &accept, error)) {
   1376     return false;
   1377   }
   1378 
   1379   // If we get an accept, we can assume the initiate has been
   1380   // received, even if we haven't gotten an IQ response.
   1381   OnInitiateAcked();
   1382 
   1383   set_remote_description(new SessionDescription(accept.ClearContents(),
   1384                                                 accept.transports,
   1385                                                 accept.groups));
   1386   // Updating transport with TransportDescription.
   1387   PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
   1388   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
   1389   SetState(STATE_RECEIVEDACCEPT);
   1390 
   1391   if (!OnRemoteCandidates(accept.transports, error))
   1392     return false;
   1393 
   1394   return true;
   1395 }
   1396 
   1397 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
   1398   if (!CheckState(STATE_SENTINITIATE, error))
   1399     return false;
   1400 
   1401   SetState(STATE_RECEIVEDREJECT);
   1402   return true;
   1403 }
   1404 
   1405 bool Session::OnInfoMessage(const SessionMessage& msg) {
   1406   SignalInfoMessage(this, msg.action_elem);
   1407   return true;
   1408 }
   1409 
   1410 bool Session::OnTerminateMessage(const SessionMessage& msg,
   1411                                  MessageError* error) {
   1412   SessionTerminate term;
   1413   if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
   1414     return false;
   1415 
   1416   SignalReceivedTerminateReason(this, term.reason);
   1417   if (term.debug_reason != buzz::STR_EMPTY) {
   1418     LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
   1419   }
   1420 
   1421   SetState(STATE_RECEIVEDTERMINATE);
   1422   return true;
   1423 }
   1424 
   1425 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
   1426                                      MessageError* error) {
   1427   TransportInfos tinfos;
   1428   if (!ParseTransportInfos(msg.protocol, msg.action_elem,
   1429                            initiator_description()->contents(),
   1430                            GetTransportParsers(), GetCandidateTranslators(),
   1431                            &tinfos, error))
   1432     return false;
   1433 
   1434   if (!OnRemoteCandidates(tinfos, error))
   1435     return false;
   1436 
   1437   return true;
   1438 }
   1439 
   1440 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
   1441                                        MessageError* error) {
   1442   // TODO: Currently here only for compatibility with
   1443   // Gingle 1.1 clients (notably, Google Voice).
   1444   return true;
   1445 }
   1446 
   1447 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
   1448                               MessageError* error) {
   1449   if (!CheckState(STATE_INPROGRESS, error))
   1450     return false;
   1451 
   1452   DescriptionInfo description_info;
   1453   if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
   1454                             GetContentParsers(), GetTransportParsers(),
   1455                             GetCandidateTranslators(),
   1456                             &description_info, error)) {
   1457     return false;
   1458   }
   1459 
   1460   ContentInfos& updated_contents = description_info.contents;
   1461 
   1462   // TODO: Currently, reflector sends back
   1463   // video stream updates even for an audio-only call, which causes
   1464   // this to fail.  Put this back once reflector is fixed.
   1465   //
   1466   // ContentInfos::iterator it;
   1467   // First, ensure all updates are valid before modifying remote_description_.
   1468   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1469   //   if (remote_description()->GetContentByName(it->name) == NULL) {
   1470   //     return false;
   1471   //   }
   1472   // }
   1473 
   1474   // TODO: We used to replace contents from an update, but
   1475   // that no longer works with partial updates.  We need to figure out
   1476   // a way to merge patial updates into contents.  For now, users of
   1477   // Session should listen to SignalRemoteDescriptionUpdate and handle
   1478   // updates.  They should not expect remote_description to be the
   1479   // latest value.
   1480   //
   1481   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1482   //     remote_description()->RemoveContentByName(it->name);
   1483   //     remote_description()->AddContent(it->name, it->type, it->description);
   1484   //   }
   1485   // }
   1486 
   1487   SignalRemoteDescriptionUpdate(this, updated_contents);
   1488   return true;
   1489 }
   1490 
   1491 bool BareJidsEqual(const std::string& name1,
   1492                    const std::string& name2) {
   1493   buzz::Jid jid1(name1);
   1494   buzz::Jid jid2(name2);
   1495 
   1496   return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
   1497 }
   1498 
   1499 bool Session::OnRedirectError(const SessionRedirect& redirect,
   1500                               SessionError* error) {
   1501   MessageError message_error;
   1502   if (!CheckState(STATE_SENTINITIATE, &message_error)) {
   1503     return BadWrite(message_error.text, error);
   1504   }
   1505 
   1506   if (!BareJidsEqual(remote_name(), redirect.target))
   1507     return BadWrite("Redirection not allowed: must be the same bare jid.",
   1508                     error);
   1509 
   1510   // When we receive a redirect, we point the session at the new JID
   1511   // and resend the candidates.
   1512   set_remote_name(redirect.target);
   1513   return (SendInitiateMessage(local_description(), error) &&
   1514           ResendAllTransportInfoMessages(error));
   1515 }
   1516 
   1517 bool Session::CheckState(State expected, MessageError* error) {
   1518   if (state() != expected) {
   1519     // The server can deliver messages out of order/repeated for various
   1520     // reasons. For example, if the server does not recive our iq response,
   1521     // it could assume that the iq it sent was lost, and will then send
   1522     // it again. Ideally, we should implement reliable messaging with
   1523     // duplicate elimination.
   1524     return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
   1525                       "message not allowed in current state",
   1526                       error);
   1527   }
   1528   return true;
   1529 }
   1530 
   1531 void Session::SetError(Error error, const std::string& error_desc) {
   1532   BaseSession::SetError(error, error_desc);
   1533   if (error != ERROR_NONE)
   1534     signaling_thread()->Post(this, MSG_ERROR);
   1535 }
   1536 
   1537 void Session::OnMessage(talk_base::Message* pmsg) {
   1538   // preserve this because BaseSession::OnMessage may modify it
   1539   State orig_state = state();
   1540 
   1541   BaseSession::OnMessage(pmsg);
   1542 
   1543   switch (pmsg->message_id) {
   1544   case MSG_ERROR:
   1545     TerminateWithReason(STR_TERMINATE_ERROR);
   1546     break;
   1547 
   1548   case MSG_STATE:
   1549     switch (orig_state) {
   1550     case STATE_SENTREJECT:
   1551     case STATE_RECEIVEDREJECT:
   1552       // Assume clean termination.
   1553       Terminate();
   1554       break;
   1555 
   1556     case STATE_SENTTERMINATE:
   1557     case STATE_RECEIVEDTERMINATE:
   1558       session_manager_->DestroySession(this);
   1559       break;
   1560 
   1561     default:
   1562       // Explicitly ignoring some states here.
   1563       break;
   1564     }
   1565     break;
   1566   }
   1567 }
   1568 
   1569 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
   1570                                   SessionError* error) {
   1571   SessionInitiate init;
   1572   init.contents = sdesc->contents();
   1573   init.transports = GetEmptyTransportInfos(init.contents);
   1574   init.groups = sdesc->groups();
   1575   return SendMessage(ACTION_SESSION_INITIATE, init, error);
   1576 }
   1577 
   1578 bool Session::WriteSessionAction(
   1579     SignalingProtocol protocol, const SessionInitiate& init,
   1580     XmlElements* elems, WriteError* error) {
   1581   return WriteSessionInitiate(protocol, init.contents, init.transports,
   1582                               GetContentParsers(), GetTransportParsers(),
   1583                               GetCandidateTranslators(), init.groups,
   1584                               elems, error);
   1585 }
   1586 
   1587 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
   1588                                 SessionError* error) {
   1589   XmlElements elems;
   1590   if (!WriteSessionAccept(current_protocol_,
   1591                           sdesc->contents(),
   1592                           GetEmptyTransportInfos(sdesc->contents()),
   1593                           GetContentParsers(), GetTransportParsers(),
   1594                           GetCandidateTranslators(), sdesc->groups(),
   1595                           &elems, error)) {
   1596     return false;
   1597   }
   1598   return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
   1599 }
   1600 
   1601 bool Session::SendRejectMessage(const std::string& reason,
   1602                                 SessionError* error) {
   1603   SessionTerminate term(reason);
   1604   return SendMessage(ACTION_SESSION_REJECT, term, error);
   1605 }
   1606 
   1607 bool Session::SendTerminateMessage(const std::string& reason,
   1608                                    SessionError* error) {
   1609   SessionTerminate term(reason);
   1610   return SendMessage(ACTION_SESSION_TERMINATE, term, error);
   1611 }
   1612 
   1613 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1614                                  const SessionTerminate& term,
   1615                                  XmlElements* elems, WriteError* error) {
   1616   WriteSessionTerminate(protocol, term, elems);
   1617   return true;
   1618 }
   1619 
   1620 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
   1621                                        SessionError* error) {
   1622   return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
   1623 }
   1624 
   1625 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
   1626                                        const Candidates& candidates,
   1627                                        SessionError* error) {
   1628   return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
   1629       TransportDescription(transproxy->type(), std::vector<std::string>(),
   1630                            std::string(), std::string(), ICEMODE_FULL,
   1631                            CONNECTIONROLE_NONE, NULL, candidates)), error);
   1632 }
   1633 
   1634 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1635                                  const TransportInfo& tinfo,
   1636                                  XmlElements* elems, WriteError* error) {
   1637   TransportInfos tinfos;
   1638   tinfos.push_back(tinfo);
   1639   return WriteTransportInfos(protocol, tinfos,
   1640                              GetTransportParsers(), GetCandidateTranslators(),
   1641                              elems, error);
   1642 }
   1643 
   1644 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
   1645   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1646        iter != transport_proxies().end(); ++iter) {
   1647     TransportProxy* transproxy = iter->second;
   1648     if (transproxy->sent_candidates().size() > 0) {
   1649       if (!SendTransportInfoMessage(
   1650               transproxy, transproxy->sent_candidates(), error)) {
   1651         LOG(LS_ERROR) << "Could not resend transport info messages: "
   1652                       << error->text;
   1653         return false;
   1654       }
   1655       transproxy->ClearSentCandidates();
   1656     }
   1657   }
   1658   return true;
   1659 }
   1660 
   1661 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
   1662   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1663        iter != transport_proxies().end(); ++iter) {
   1664     TransportProxy* transproxy = iter->second;
   1665     if (transproxy->unsent_candidates().size() > 0) {
   1666       if (!SendTransportInfoMessage(
   1667               transproxy, transproxy->unsent_candidates(), error)) {
   1668         LOG(LS_ERROR) << "Could not send unsent transport info messages: "
   1669                       << error->text;
   1670         return false;
   1671       }
   1672       transproxy->ClearUnsentCandidates();
   1673     }
   1674   }
   1675   return true;
   1676 }
   1677 
   1678 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1679                           SessionError* error) {
   1680     return SendMessage(type, action_elems, remote_name(), error);
   1681 }
   1682 
   1683 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1684                           const std::string& remote_name, SessionError* error) {
   1685   talk_base::scoped_ptr<buzz::XmlElement> stanza(
   1686       new buzz::XmlElement(buzz::QN_IQ));
   1687 
   1688   SessionMessage msg(current_protocol_, type, id(), initiator_name());
   1689   msg.to = remote_name;
   1690   WriteSessionMessage(msg, action_elems, stanza.get());
   1691 
   1692   SignalOutgoingMessage(this, stanza.get());
   1693   return true;
   1694 }
   1695 
   1696 template <typename Action>
   1697 bool Session::SendMessage(ActionType type, const Action& action,
   1698                           SessionError* error) {
   1699   talk_base::scoped_ptr<buzz::XmlElement> stanza(
   1700       new buzz::XmlElement(buzz::QN_IQ));
   1701   if (!WriteActionMessage(type, action, stanza.get(), error))
   1702     return false;
   1703 
   1704   SignalOutgoingMessage(this, stanza.get());
   1705   return true;
   1706 }
   1707 
   1708 template <typename Action>
   1709 bool Session::WriteActionMessage(ActionType type, const Action& action,
   1710                                  buzz::XmlElement* stanza,
   1711                                  WriteError* error) {
   1712   if (current_protocol_ == PROTOCOL_HYBRID) {
   1713     if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
   1714       return false;
   1715     if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
   1716       return false;
   1717   } else {
   1718     if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
   1719       return false;
   1720   }
   1721   return true;
   1722 }
   1723 
   1724 template <typename Action>
   1725 bool Session::WriteActionMessage(SignalingProtocol protocol,
   1726                                  ActionType type, const Action& action,
   1727                                  buzz::XmlElement* stanza, WriteError* error) {
   1728   XmlElements action_elems;
   1729   if (!WriteSessionAction(protocol, action, &action_elems, error))
   1730     return false;
   1731 
   1732   SessionMessage msg(protocol, type, id(), initiator_name());
   1733   msg.to = remote_name();
   1734 
   1735   WriteSessionMessage(msg, action_elems, stanza);
   1736   return true;
   1737 }
   1738 
   1739 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
   1740   talk_base::scoped_ptr<buzz::XmlElement> ack(
   1741       new buzz::XmlElement(buzz::QN_IQ));
   1742   ack->SetAttr(buzz::QN_TO, remote_name());
   1743   ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
   1744   ack->SetAttr(buzz::QN_TYPE, "result");
   1745 
   1746   SignalOutgoingMessage(this, ack.get());
   1747 }
   1748 
   1749 }  // namespace cricket
   1750