Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/session.h"
     29 
     30 #include "talk/base/bind.h"
     31 #include "talk/base/common.h"
     32 #include "talk/base/logging.h"
     33 #include "talk/base/helpers.h"
     34 #include "talk/base/scoped_ptr.h"
     35 #include "talk/base/sslstreamadapter.h"
     36 #include "talk/xmpp/constants.h"
     37 #include "talk/xmpp/jid.h"
     38 #include "talk/p2p/base/dtlstransport.h"
     39 #include "talk/p2p/base/p2ptransport.h"
     40 #include "talk/p2p/base/sessionclient.h"
     41 #include "talk/p2p/base/transport.h"
     42 #include "talk/p2p/base/transportchannelproxy.h"
     43 #include "talk/p2p/base/transportinfo.h"
     44 
     45 #include "talk/p2p/base/constants.h"
     46 
     47 namespace cricket {
     48 
     49 using talk_base::Bind;
     50 
     51 bool BadMessage(const buzz::QName type,
     52                 const std::string& text,
     53                 MessageError* err) {
     54   err->SetType(type);
     55   err->SetText(text);
     56   return false;
     57 }
     58 
     59 TransportProxy::~TransportProxy() {
     60   for (ChannelMap::iterator iter = channels_.begin();
     61        iter != channels_.end(); ++iter) {
     62     iter->second->SignalDestroyed(iter->second);
     63     delete iter->second;
     64   }
     65 }
     66 
     67 std::string TransportProxy::type() const {
     68   return transport_->get()->type();
     69 }
     70 
     71 TransportChannel* TransportProxy::GetChannel(int component) {
     72   ASSERT(talk_base::Thread::Current() == worker_thread_);
     73   return GetChannelProxy(component);
     74 }
     75 
     76 TransportChannel* TransportProxy::CreateChannel(
     77     const std::string& name, int component) {
     78   ASSERT(talk_base::Thread::Current() == worker_thread_);
     79   ASSERT(GetChannel(component) == NULL);
     80   ASSERT(!transport_->get()->HasChannel(component));
     81 
     82   // We always create a proxy in case we need to change out the transport later.
     83   TransportChannelProxy* channel =
     84       new TransportChannelProxy(content_name(), name, component);
     85   channels_[component] = channel;
     86 
     87   // If we're already negotiated, create an impl and hook it up to the proxy
     88   // channel. If we're connecting, create an impl but don't hook it up yet.
     89   if (negotiated_) {
     90     SetupChannelProxy_w(component, channel);
     91   } else if (connecting_) {
     92     GetOrCreateChannelProxyImpl_w(component);
     93   }
     94   return channel;
     95 }
     96 
     97 bool TransportProxy::HasChannel(int component) {
     98   return transport_->get()->HasChannel(component);
     99 }
    100 
    101 void TransportProxy::DestroyChannel(int component) {
    102   ASSERT(talk_base::Thread::Current() == worker_thread_);
    103   TransportChannel* channel = GetChannel(component);
    104   if (channel) {
    105     // If the state of TransportProxy is not NEGOTIATED
    106     // then TransportChannelProxy and its impl are not
    107     // connected. Both must be connected before
    108     // deletion.
    109     if (!negotiated_) {
    110       SetupChannelProxy_w(component, GetChannelProxy(component));
    111     }
    112 
    113     channels_.erase(component);
    114     channel->SignalDestroyed(channel);
    115     delete channel;
    116   }
    117 }
    118 
    119 void TransportProxy::ConnectChannels() {
    120   if (!connecting_) {
    121     if (!negotiated_) {
    122       for (ChannelMap::iterator iter = channels_.begin();
    123            iter != channels_.end(); ++iter) {
    124         GetOrCreateChannelProxyImpl(iter->first);
    125       }
    126     }
    127     connecting_ = true;
    128   }
    129   // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
    130   // don't have any channels yet, so we need to allow this method to be called
    131   // multiple times. Once we fix Transport, we can move this call inside the
    132   // if (!connecting_) block.
    133   transport_->get()->ConnectChannels();
    134 }
    135 
    136 void TransportProxy::CompleteNegotiation() {
    137   if (!negotiated_) {
    138     for (ChannelMap::iterator iter = channels_.begin();
    139          iter != channels_.end(); ++iter) {
    140       SetupChannelProxy(iter->first, iter->second);
    141     }
    142     negotiated_ = true;
    143   }
    144 }
    145 
    146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
    147   for (Candidates::const_iterator cand = candidates.begin();
    148        cand != candidates.end(); ++cand) {
    149     sent_candidates_.push_back(*cand);
    150   }
    151 }
    152 
    153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
    154   for (Candidates::const_iterator cand = candidates.begin();
    155        cand != candidates.end(); ++cand) {
    156     unsent_candidates_.push_back(*cand);
    157   }
    158 }
    159 
    160 bool TransportProxy::GetChannelNameFromComponent(
    161     int component, std::string* channel_name) const {
    162   const TransportChannelProxy* channel = GetChannelProxy(component);
    163   if (channel == NULL) {
    164     return false;
    165   }
    166 
    167   *channel_name = channel->name();
    168   return true;
    169 }
    170 
    171 bool TransportProxy::GetComponentFromChannelName(
    172     const std::string& channel_name, int* component) const {
    173   const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
    174   if (channel == NULL) {
    175     return false;
    176   }
    177 
    178   *component = channel->component();
    179   return true;
    180 }
    181 
    182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
    183   ChannelMap::const_iterator iter = channels_.find(component);
    184   return (iter != channels_.end()) ? iter->second : NULL;
    185 }
    186 
    187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
    188     const std::string& name) const {
    189   for (ChannelMap::const_iterator iter = channels_.begin();
    190        iter != channels_.end();
    191        ++iter) {
    192     if (iter->second->name() == name) {
    193       return iter->second;
    194     }
    195   }
    196   return NULL;
    197 }
    198 
    199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
    200     int component) {
    201   return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
    202       &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
    203 }
    204 
    205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
    206     int component) {
    207   ASSERT(talk_base::Thread::Current() == worker_thread_);
    208   TransportChannelImpl* impl = transport_->get()->GetChannel(component);
    209   if (impl == NULL) {
    210     impl = transport_->get()->CreateChannel(component);
    211   }
    212   return impl;
    213 }
    214 
    215 void TransportProxy::SetupChannelProxy(
    216     int component, TransportChannelProxy* transproxy) {
    217   worker_thread_->Invoke<void>(Bind(
    218       &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
    219 }
    220 
    221 void TransportProxy::SetupChannelProxy_w(
    222     int component, TransportChannelProxy* transproxy) {
    223   ASSERT(talk_base::Thread::Current() == worker_thread_);
    224   TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
    225   ASSERT(impl != NULL);
    226   transproxy->SetImplementation(impl);
    227 }
    228 
    229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
    230                                              TransportChannelImpl* impl) {
    231   worker_thread_->Invoke<void>(Bind(
    232       &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
    233 }
    234 
    235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
    236                                                TransportChannelImpl* impl) {
    237   ASSERT(talk_base::Thread::Current() == worker_thread_);
    238   ASSERT(proxy != NULL);
    239   proxy->SetImplementation(impl);
    240 }
    241 
    242 // This function muxes |this| onto |target| by repointing |this| at
    243 // |target|'s transport and setting our TransportChannelProxies
    244 // to point to |target|'s underlying implementations.
    245 bool TransportProxy::SetupMux(TransportProxy* target) {
    246   // Bail out if there's nothing to do.
    247   if (transport_ == target->transport_) {
    248     return true;
    249   }
    250 
    251   // Run through all channels and remove any non-rtp transport channels before
    252   // setting target transport channels.
    253   for (ChannelMap::const_iterator iter = channels_.begin();
    254        iter != channels_.end(); ++iter) {
    255     if (!target->transport_->get()->HasChannel(iter->first)) {
    256       // Remove if channel doesn't exist in |transport_|.
    257       ReplaceChannelProxyImpl(iter->second, NULL);
    258     } else {
    259       // Replace the impl for all the TransportProxyChannels with the channels
    260       // from |target|'s transport. Fail if there's not an exact match.
    261       ReplaceChannelProxyImpl(
    262           iter->second, target->transport_->get()->CreateChannel(iter->first));
    263     }
    264   }
    265 
    266   // Now replace our transport. Must happen afterwards because
    267   // it deletes all impls as a side effect.
    268   transport_ = target->transport_;
    269   transport_->get()->SignalCandidatesReady.connect(
    270       this, &TransportProxy::OnTransportCandidatesReady);
    271   set_candidates_allocated(target->candidates_allocated());
    272   return true;
    273 }
    274 
    275 void TransportProxy::SetIceRole(IceRole role) {
    276   transport_->get()->SetIceRole(role);
    277 }
    278 
    279 bool TransportProxy::SetLocalTransportDescription(
    280     const TransportDescription& description, ContentAction action) {
    281   // If this is an answer, finalize the negotiation.
    282   if (action == CA_ANSWER) {
    283     CompleteNegotiation();
    284   }
    285   return transport_->get()->SetLocalTransportDescription(description, action);
    286 }
    287 
    288 bool TransportProxy::SetRemoteTransportDescription(
    289     const TransportDescription& description, ContentAction action) {
    290   // If this is an answer, finalize the negotiation.
    291   if (action == CA_ANSWER) {
    292     CompleteNegotiation();
    293   }
    294   return transport_->get()->SetRemoteTransportDescription(description, action);
    295 }
    296 
    297 void TransportProxy::OnSignalingReady() {
    298   // If we're starting a new allocation sequence, reset our state.
    299   set_candidates_allocated(false);
    300   transport_->get()->OnSignalingReady();
    301 }
    302 
    303 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
    304                                         std::string* error) {
    305   // Ensure the transport is negotiated before handling candidates.
    306   // TODO(juberti): Remove this once everybody calls SetLocalTD.
    307   CompleteNegotiation();
    308 
    309   // Verify each candidate before passing down to transport layer.
    310   for (Candidates::const_iterator cand = candidates.begin();
    311        cand != candidates.end(); ++cand) {
    312     if (!transport_->get()->VerifyCandidate(*cand, error))
    313       return false;
    314     if (!HasChannel(cand->component())) {
    315       *error = "Candidate has unknown component: " + cand->ToString() +
    316                " for content: " + content_name_;
    317       return false;
    318     }
    319   }
    320   transport_->get()->OnRemoteCandidates(candidates);
    321   return true;
    322 }
    323 
    324 void TransportProxy::SetIdentity(
    325     talk_base::SSLIdentity* identity) {
    326   transport_->get()->SetIdentity(identity);
    327 }
    328 
    329 std::string BaseSession::StateToString(State state) {
    330   switch (state) {
    331     case Session::STATE_INIT:
    332       return "STATE_INIT";
    333     case Session::STATE_SENTINITIATE:
    334       return "STATE_SENTINITIATE";
    335     case Session::STATE_RECEIVEDINITIATE:
    336       return "STATE_RECEIVEDINITIATE";
    337     case Session::STATE_SENTPRACCEPT:
    338       return "STATE_SENTPRACCEPT";
    339     case Session::STATE_SENTACCEPT:
    340       return "STATE_SENTACCEPT";
    341     case Session::STATE_RECEIVEDPRACCEPT:
    342       return "STATE_RECEIVEDPRACCEPT";
    343     case Session::STATE_RECEIVEDACCEPT:
    344       return "STATE_RECEIVEDACCEPT";
    345     case Session::STATE_SENTMODIFY:
    346       return "STATE_SENTMODIFY";
    347     case Session::STATE_RECEIVEDMODIFY:
    348       return "STATE_RECEIVEDMODIFY";
    349     case Session::STATE_SENTREJECT:
    350       return "STATE_SENTREJECT";
    351     case Session::STATE_RECEIVEDREJECT:
    352       return "STATE_RECEIVEDREJECT";
    353     case Session::STATE_SENTREDIRECT:
    354       return "STATE_SENTREDIRECT";
    355     case Session::STATE_SENTTERMINATE:
    356       return "STATE_SENTTERMINATE";
    357     case Session::STATE_RECEIVEDTERMINATE:
    358       return "STATE_RECEIVEDTERMINATE";
    359     case Session::STATE_INPROGRESS:
    360       return "STATE_INPROGRESS";
    361     case Session::STATE_DEINIT:
    362       return "STATE_DEINIT";
    363     default:
    364       break;
    365   }
    366   return "STATE_" + talk_base::ToString(state);
    367 }
    368 
    369 BaseSession::BaseSession(talk_base::Thread* signaling_thread,
    370                          talk_base::Thread* worker_thread,
    371                          PortAllocator* port_allocator,
    372                          const std::string& sid,
    373                          const std::string& content_type,
    374                          bool initiator)
    375     : state_(STATE_INIT),
    376       error_(ERROR_NONE),
    377       signaling_thread_(signaling_thread),
    378       worker_thread_(worker_thread),
    379       port_allocator_(port_allocator),
    380       sid_(sid),
    381       content_type_(content_type),
    382       transport_type_(NS_GINGLE_P2P),
    383       initiator_(initiator),
    384       identity_(NULL),
    385       local_description_(NULL),
    386       remote_description_(NULL),
    387       ice_tiebreaker_(talk_base::CreateRandomId64()),
    388       role_switch_(false) {
    389   ASSERT(signaling_thread->IsCurrent());
    390 }
    391 
    392 BaseSession::~BaseSession() {
    393   ASSERT(signaling_thread()->IsCurrent());
    394 
    395   ASSERT(state_ != STATE_DEINIT);
    396   LogState(state_, STATE_DEINIT);
    397   state_ = STATE_DEINIT;
    398   SignalState(this, state_);
    399 
    400   for (TransportMap::iterator iter = transports_.begin();
    401        iter != transports_.end(); ++iter) {
    402     delete iter->second;
    403   }
    404 
    405   delete remote_description_;
    406   delete local_description_;
    407 }
    408 
    409 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) {
    410   if (identity_)
    411     return false;
    412   identity_ = identity;
    413   for (TransportMap::iterator iter = transports_.begin();
    414        iter != transports_.end(); ++iter) {
    415     iter->second->SetIdentity(identity_);
    416   }
    417   return true;
    418 }
    419 
    420 bool BaseSession::PushdownTransportDescription(ContentSource source,
    421                                                ContentAction action) {
    422   if (source == CS_LOCAL) {
    423     return PushdownLocalTransportDescription(local_description_, action);
    424   }
    425   return PushdownRemoteTransportDescription(remote_description_, action);
    426 }
    427 
    428 bool BaseSession::PushdownLocalTransportDescription(
    429     const SessionDescription* sdesc,
    430     ContentAction action) {
    431   // Update the Transports with the right information, and trigger them to
    432   // start connecting.
    433   for (TransportMap::iterator iter = transports_.begin();
    434        iter != transports_.end(); ++iter) {
    435     // If no transport info was in this session description, ret == false
    436     // and we just skip this one.
    437     TransportDescription tdesc;
    438     bool ret = GetTransportDescription(
    439         sdesc, iter->second->content_name(), &tdesc);
    440     if (ret) {
    441       if (!iter->second->SetLocalTransportDescription(tdesc, action)) {
    442         return false;
    443       }
    444 
    445       iter->second->ConnectChannels();
    446     }
    447   }
    448 
    449   return true;
    450 }
    451 
    452 bool BaseSession::PushdownRemoteTransportDescription(
    453     const SessionDescription* sdesc,
    454     ContentAction action) {
    455   // Update the Transports with the right information.
    456   for (TransportMap::iterator iter = transports_.begin();
    457        iter != transports_.end(); ++iter) {
    458     TransportDescription tdesc;
    459 
    460     // If no transport info was in this session description, ret == false
    461     // and we just skip this one.
    462     bool ret = GetTransportDescription(
    463         sdesc, iter->second->content_name(), &tdesc);
    464     if (ret) {
    465       if (!iter->second->SetRemoteTransportDescription(tdesc, action)) {
    466         return false;
    467       }
    468     }
    469   }
    470 
    471   return true;
    472 }
    473 
    474 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
    475                                              const std::string& channel_name,
    476                                              int component) {
    477   // We create the proxy "on demand" here because we need to support
    478   // creating channels at any time, even before we send or receive
    479   // initiate messages, which is before we create the transports.
    480   TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
    481   return transproxy->CreateChannel(channel_name, component);
    482 }
    483 
    484 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
    485                                           int component) {
    486   TransportProxy* transproxy = GetTransportProxy(content_name);
    487   if (transproxy == NULL)
    488     return NULL;
    489   else
    490     return transproxy->GetChannel(component);
    491 }
    492 
    493 void BaseSession::DestroyChannel(const std::string& content_name,
    494                                  int component) {
    495   TransportProxy* transproxy = GetTransportProxy(content_name);
    496   ASSERT(transproxy != NULL);
    497   transproxy->DestroyChannel(component);
    498 }
    499 
    500 TransportProxy* BaseSession::GetOrCreateTransportProxy(
    501     const std::string& content_name) {
    502   TransportProxy* transproxy = GetTransportProxy(content_name);
    503   if (transproxy)
    504     return transproxy;
    505 
    506   Transport* transport = CreateTransport(content_name);
    507   transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
    508   transport->SetIceTiebreaker(ice_tiebreaker_);
    509   // TODO: Connect all the Transport signals to TransportProxy
    510   // then to the BaseSession.
    511   transport->SignalConnecting.connect(
    512       this, &BaseSession::OnTransportConnecting);
    513   transport->SignalWritableState.connect(
    514       this, &BaseSession::OnTransportWritable);
    515   transport->SignalRequestSignaling.connect(
    516       this, &BaseSession::OnTransportRequestSignaling);
    517   transport->SignalTransportError.connect(
    518       this, &BaseSession::OnTransportSendError);
    519   transport->SignalRouteChange.connect(
    520       this, &BaseSession::OnTransportRouteChange);
    521   transport->SignalCandidatesAllocationDone.connect(
    522       this, &BaseSession::OnTransportCandidatesAllocationDone);
    523   transport->SignalRoleConflict.connect(
    524       this, &BaseSession::OnRoleConflict);
    525 
    526   transproxy = new TransportProxy(worker_thread_, sid_, content_name,
    527                                   new TransportWrapper(transport));
    528   transproxy->SignalCandidatesReady.connect(
    529       this, &BaseSession::OnTransportProxyCandidatesReady);
    530   transports_[content_name] = transproxy;
    531 
    532   return transproxy;
    533 }
    534 
    535 Transport* BaseSession::GetTransport(const std::string& content_name) {
    536   TransportProxy* transproxy = GetTransportProxy(content_name);
    537   if (transproxy == NULL)
    538     return NULL;
    539   return transproxy->impl();
    540 }
    541 
    542 TransportProxy* BaseSession::GetTransportProxy(
    543     const std::string& content_name) {
    544   TransportMap::iterator iter = transports_.find(content_name);
    545   return (iter != transports_.end()) ? iter->second : NULL;
    546 }
    547 
    548 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
    549   for (TransportMap::iterator iter = transports_.begin();
    550        iter != transports_.end(); ++iter) {
    551     TransportProxy* transproxy = iter->second;
    552     if (transproxy->impl() == transport) {
    553       return transproxy;
    554     }
    555   }
    556   return NULL;
    557 }
    558 
    559 TransportProxy* BaseSession::GetFirstTransportProxy() {
    560   if (transports_.empty())
    561     return NULL;
    562   return transports_.begin()->second;
    563 }
    564 
    565 void BaseSession::DestroyTransportProxy(
    566     const std::string& content_name) {
    567   TransportMap::iterator iter = transports_.find(content_name);
    568   if (iter != transports_.end()) {
    569     delete iter->second;
    570     transports_.erase(content_name);
    571   }
    572 }
    573 
    574 cricket::Transport* BaseSession::CreateTransport(
    575     const std::string& content_name) {
    576   ASSERT(transport_type_ == NS_GINGLE_P2P);
    577   return new cricket::DtlsTransport<P2PTransport>(
    578       signaling_thread(), worker_thread(), content_name,
    579       port_allocator(), identity_);
    580 }
    581 
    582 bool BaseSession::GetStats(SessionStats* stats) {
    583   for (TransportMap::iterator iter = transports_.begin();
    584        iter != transports_.end(); ++iter) {
    585     std::string proxy_id = iter->second->content_name();
    586     // We are ignoring not-yet-instantiated transports.
    587     if (iter->second->impl()) {
    588       std::string transport_id = iter->second->impl()->content_name();
    589       stats->proxy_to_transport[proxy_id] = transport_id;
    590       if (stats->transport_stats.find(transport_id)
    591           == stats->transport_stats.end()) {
    592         TransportStats subinfos;
    593         if (!iter->second->impl()->GetStats(&subinfos)) {
    594           return false;
    595         }
    596         stats->transport_stats[transport_id] = subinfos;
    597       }
    598     }
    599   }
    600   return true;
    601 }
    602 
    603 void BaseSession::SetState(State state) {
    604   ASSERT(signaling_thread_->IsCurrent());
    605   if (state != state_) {
    606     LogState(state_, state);
    607     state_ = state;
    608     SignalState(this, state_);
    609     signaling_thread_->Post(this, MSG_STATE);
    610   }
    611   SignalNewDescription();
    612 }
    613 
    614 void BaseSession::SetError(Error error) {
    615   ASSERT(signaling_thread_->IsCurrent());
    616   if (error != error_) {
    617     error_ = error;
    618     SignalError(this, error);
    619   }
    620 }
    621 
    622 void BaseSession::OnSignalingReady() {
    623   ASSERT(signaling_thread()->IsCurrent());
    624   for (TransportMap::iterator iter = transports_.begin();
    625        iter != transports_.end(); ++iter) {
    626     iter->second->OnSignalingReady();
    627   }
    628 }
    629 
    630 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
    631 // start, remove this method once everyone calls PushdownLocalTD.
    632 void BaseSession::SpeculativelyConnectAllTransportChannels() {
    633   // Put all transports into the connecting state.
    634   for (TransportMap::iterator iter = transports_.begin();
    635        iter != transports_.end(); ++iter) {
    636     iter->second->ConnectChannels();
    637   }
    638 }
    639 
    640 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
    641                                      const Candidates& candidates,
    642                                      std::string* error) {
    643   // Give candidates to the appropriate transport, and tell that transport
    644   // to start connecting, if it's not already doing so.
    645   TransportProxy* transproxy = GetTransportProxy(content_name);
    646   if (!transproxy) {
    647     *error = "Unknown content name " + content_name;
    648     return false;
    649   }
    650   if (!transproxy->OnRemoteCandidates(candidates, error)) {
    651     return false;
    652   }
    653   // TODO(juberti): Remove this call once we can be sure that we always have
    654   // a local transport description (which will trigger the connection).
    655   transproxy->ConnectChannels();
    656   return true;
    657 }
    658 
    659 bool BaseSession::MaybeEnableMuxingSupport() {
    660   // We need both a local and remote description to decide if we should mux.
    661   if ((state_ == STATE_SENTINITIATE ||
    662       state_ == STATE_RECEIVEDINITIATE) &&
    663       ((local_description_ == NULL) ||
    664       (remote_description_ == NULL))) {
    665     return false;
    666   }
    667 
    668   // In order to perform the multiplexing, we need all proxies to be in the
    669   // negotiated state, i.e. to have implementations underneath.
    670   // Ensure that this is the case, regardless of whether we are going to mux.
    671   for (TransportMap::iterator iter = transports_.begin();
    672        iter != transports_.end(); ++iter) {
    673     ASSERT(iter->second->negotiated());
    674     if (!iter->second->negotiated())
    675       return false;
    676   }
    677 
    678   // If both sides agree to BUNDLE, mux all the specified contents onto the
    679   // transport belonging to the first content name in the BUNDLE group.
    680   // If the contents are already muxed, this will be a no-op.
    681   // TODO(juberti): Should this check that local and remote have configured
    682   // BUNDLE the same way?
    683   bool candidates_allocated = IsCandidateAllocationDone();
    684   const ContentGroup* local_bundle_group =
    685       local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    686   const ContentGroup* remote_bundle_group =
    687       remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
    688   if (local_bundle_group && remote_bundle_group &&
    689       local_bundle_group->FirstContentName()) {
    690     const std::string* content_name = local_bundle_group->FirstContentName();
    691     const ContentInfo* content =
    692         local_description_->GetContentByName(*content_name);
    693     ASSERT(content != NULL);
    694     if (!SetSelectedProxy(content->name, local_bundle_group)) {
    695       LOG(LS_WARNING) << "Failed to set up BUNDLE";
    696       return false;
    697     }
    698 
    699     // If we weren't done gathering before, we might be done now, as a result
    700     // of enabling mux.
    701     LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
    702                  << *content_name;
    703     if (!candidates_allocated) {
    704       MaybeCandidateAllocationDone();
    705     }
    706   } else {
    707     LOG(LS_INFO) << "No BUNDLE information, not bundling.";
    708   }
    709   return true;
    710 }
    711 
    712 bool BaseSession::SetSelectedProxy(const std::string& content_name,
    713                                    const ContentGroup* muxed_group) {
    714   TransportProxy* selected_proxy = GetTransportProxy(content_name);
    715   if (!selected_proxy) {
    716     return false;
    717   }
    718 
    719   ASSERT(selected_proxy->negotiated());
    720   for (TransportMap::iterator iter = transports_.begin();
    721        iter != transports_.end(); ++iter) {
    722     // If content is part of the mux group, then repoint its proxy at the
    723     // transport object that we have chosen to mux onto. If the proxy
    724     // is already pointing at the right object, it will be a no-op.
    725     if (muxed_group->HasContentName(iter->first) &&
    726         !iter->second->SetupMux(selected_proxy)) {
    727       return false;
    728     }
    729   }
    730   return true;
    731 }
    732 
    733 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
    734   // TODO(juberti): This is a clunky way of processing the done signal. Instead,
    735   // TransportProxy should receive the done signal directly, set its allocated
    736   // flag internally, and then reissue the done signal to Session.
    737   // Overall we should make TransportProxy receive *all* the signals from
    738   // Transport, since this removes the need to manually iterate over all
    739   // the transports, as is needed to make sure signals are handled properly
    740   // when BUNDLEing.
    741 #if 0
    742   ASSERT(!IsCandidateAllocationDone());
    743 #endif
    744   for (TransportMap::iterator iter = transports_.begin();
    745        iter != transports_.end(); ++iter) {
    746     if (iter->second->impl() == transport) {
    747       iter->second->set_candidates_allocated(true);
    748     }
    749   }
    750   MaybeCandidateAllocationDone();
    751 }
    752 
    753 bool BaseSession::IsCandidateAllocationDone() const {
    754   for (TransportMap::const_iterator iter = transports_.begin();
    755        iter != transports_.end(); ++iter) {
    756     if (!iter->second->candidates_allocated())
    757       return false;
    758   }
    759   return true;
    760 }
    761 
    762 void BaseSession::MaybeCandidateAllocationDone() {
    763   if (IsCandidateAllocationDone()) {
    764     LOG(LS_INFO) << "Candidate gathering is complete.";
    765     OnCandidatesAllocationDone();
    766   }
    767 }
    768 
    769 void BaseSession::OnRoleConflict() {
    770   if (role_switch_) {
    771     LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
    772     return;
    773   }
    774 
    775   role_switch_ = true;
    776   for (TransportMap::iterator iter = transports_.begin();
    777        iter != transports_.end(); ++iter) {
    778     // Role will be reverse of initial role setting.
    779     IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
    780     iter->second->SetIceRole(role);
    781   }
    782 }
    783 
    784 void BaseSession::LogState(State old_state, State new_state) {
    785   LOG(LS_INFO) << "Session:" << id()
    786                << " Old state:" << StateToString(old_state)
    787                << " New state:" << StateToString(new_state)
    788                << " Type:" << content_type()
    789                << " Transport:" << transport_type();
    790 }
    791 
    792 bool BaseSession::GetTransportDescription(const SessionDescription* description,
    793                                           const std::string& content_name,
    794                                           TransportDescription* tdesc) {
    795   if (!description || !tdesc) {
    796     return false;
    797   }
    798   const TransportInfo* transport_info =
    799       description->GetTransportInfoByName(content_name);
    800   if (!transport_info) {
    801     return false;
    802   }
    803   *tdesc = transport_info->description;
    804   return true;
    805 }
    806 
    807 void BaseSession::SignalNewDescription() {
    808   ContentAction action;
    809   ContentSource source;
    810   if (!GetContentAction(&action, &source)) {
    811     return;
    812   }
    813   if (source == CS_LOCAL) {
    814     SignalNewLocalDescription(this, action);
    815   } else {
    816     SignalNewRemoteDescription(this, action);
    817   }
    818 }
    819 
    820 bool BaseSession::GetContentAction(ContentAction* action,
    821                                    ContentSource* source) {
    822   switch (state_) {
    823     // new local description
    824     case STATE_SENTINITIATE:
    825       *action = CA_OFFER;
    826       *source = CS_LOCAL;
    827       break;
    828     case STATE_SENTPRACCEPT:
    829       *action = CA_PRANSWER;
    830       *source = CS_LOCAL;
    831       break;
    832     case STATE_SENTACCEPT:
    833       *action = CA_ANSWER;
    834       *source = CS_LOCAL;
    835       break;
    836     // new remote description
    837     case STATE_RECEIVEDINITIATE:
    838       *action = CA_OFFER;
    839       *source = CS_REMOTE;
    840       break;
    841     case STATE_RECEIVEDPRACCEPT:
    842       *action = CA_PRANSWER;
    843       *source = CS_REMOTE;
    844       break;
    845     case STATE_RECEIVEDACCEPT:
    846       *action = CA_ANSWER;
    847       *source = CS_REMOTE;
    848       break;
    849     default:
    850       return false;
    851   }
    852   return true;
    853 }
    854 
    855 void BaseSession::OnMessage(talk_base::Message *pmsg) {
    856   switch (pmsg->message_id) {
    857   case MSG_TIMEOUT:
    858     // Session timeout has occured.
    859     SetError(ERROR_TIME);
    860     break;
    861 
    862   case MSG_STATE:
    863     switch (state_) {
    864     case STATE_SENTACCEPT:
    865     case STATE_RECEIVEDACCEPT:
    866       SetState(STATE_INPROGRESS);
    867       break;
    868 
    869     default:
    870       // Explicitly ignoring some states here.
    871       break;
    872     }
    873     break;
    874   }
    875 }
    876 
    877 Session::Session(SessionManager* session_manager,
    878                  const std::string& local_name,
    879                  const std::string& initiator_name,
    880                  const std::string& sid,
    881                  const std::string& content_type,
    882                  SessionClient* client)
    883     : BaseSession(session_manager->signaling_thread(),
    884                   session_manager->worker_thread(),
    885                   session_manager->port_allocator(),
    886                   sid, content_type, initiator_name == local_name) {
    887   ASSERT(client != NULL);
    888   session_manager_ = session_manager;
    889   local_name_ = local_name;
    890   initiator_name_ = initiator_name;
    891   transport_parser_ = new P2PTransportParser();
    892   client_ = client;
    893   initiate_acked_ = false;
    894   current_protocol_ = PROTOCOL_HYBRID;
    895 }
    896 
    897 Session::~Session() {
    898   delete transport_parser_;
    899 }
    900 
    901 bool Session::Initiate(const std::string &to,
    902                        const SessionDescription* sdesc) {
    903   ASSERT(signaling_thread()->IsCurrent());
    904   SessionError error;
    905 
    906   // Only from STATE_INIT
    907   if (state() != STATE_INIT)
    908     return false;
    909 
    910   // Setup for signaling.
    911   set_remote_name(to);
    912   set_local_description(sdesc);
    913   if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
    914                               &error)) {
    915     LOG(LS_ERROR) << "Could not create transports: " << error.text;
    916     return false;
    917   }
    918 
    919   if (!SendInitiateMessage(sdesc, &error)) {
    920     LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
    921     return false;
    922   }
    923 
    924   // We need to connect transport proxy and impl here so that we can process
    925   // the TransportDescriptions.
    926   SpeculativelyConnectAllTransportChannels();
    927 
    928   PushdownTransportDescription(CS_LOCAL, CA_OFFER);
    929   SetState(Session::STATE_SENTINITIATE);
    930   return true;
    931 }
    932 
    933 bool Session::Accept(const SessionDescription* sdesc) {
    934   ASSERT(signaling_thread()->IsCurrent());
    935 
    936   // Only if just received initiate
    937   if (state() != STATE_RECEIVEDINITIATE)
    938     return false;
    939 
    940   // Setup for signaling.
    941   set_local_description(sdesc);
    942 
    943   SessionError error;
    944   if (!SendAcceptMessage(sdesc, &error)) {
    945     LOG(LS_ERROR) << "Could not send accept message: " << error.text;
    946     return false;
    947   }
    948   // TODO(juberti): Add BUNDLE support to transport-info messages.
    949   PushdownTransportDescription(CS_LOCAL, CA_ANSWER);
    950   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
    951   SetState(Session::STATE_SENTACCEPT);
    952   return true;
    953 }
    954 
    955 bool Session::Reject(const std::string& reason) {
    956   ASSERT(signaling_thread()->IsCurrent());
    957 
    958   // Reject is sent in response to an initiate or modify, to reject the
    959   // request
    960   if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
    961     return false;
    962 
    963   SessionError error;
    964   if (!SendRejectMessage(reason, &error)) {
    965     LOG(LS_ERROR) << "Could not send reject message: " << error.text;
    966     return false;
    967   }
    968 
    969   SetState(STATE_SENTREJECT);
    970   return true;
    971 }
    972 
    973 bool Session::TerminateWithReason(const std::string& reason) {
    974   ASSERT(signaling_thread()->IsCurrent());
    975 
    976   // Either side can terminate, at any time.
    977   switch (state()) {
    978     case STATE_SENTTERMINATE:
    979     case STATE_RECEIVEDTERMINATE:
    980       return false;
    981 
    982     case STATE_SENTREJECT:
    983     case STATE_RECEIVEDREJECT:
    984       // We don't need to send terminate if we sent or received a reject...
    985       // it's implicit.
    986       break;
    987 
    988     default:
    989       SessionError error;
    990       if (!SendTerminateMessage(reason, &error)) {
    991         LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
    992         return false;
    993       }
    994       break;
    995   }
    996 
    997   SetState(STATE_SENTTERMINATE);
    998   return true;
    999 }
   1000 
   1001 bool Session::SendInfoMessage(const XmlElements& elems,
   1002                               const std::string& remote_name) {
   1003   ASSERT(signaling_thread()->IsCurrent());
   1004   SessionError error;
   1005   if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
   1006     LOG(LS_ERROR) << "Could not send info message " << error.text;
   1007     return false;
   1008   }
   1009   return true;
   1010 }
   1011 
   1012 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
   1013   XmlElements elems;
   1014   WriteError write_error;
   1015   if (!WriteDescriptionInfo(current_protocol_,
   1016                             contents,
   1017                             GetContentParsers(),
   1018                             &elems, &write_error)) {
   1019     LOG(LS_ERROR) << "Could not write description info message: "
   1020                   << write_error.text;
   1021     return false;
   1022   }
   1023   SessionError error;
   1024   if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
   1025     LOG(LS_ERROR) << "Could not send description info message: "
   1026                   << error.text;
   1027     return false;
   1028   }
   1029   return true;
   1030 }
   1031 
   1032 TransportInfos Session::GetEmptyTransportInfos(
   1033     const ContentInfos& contents) const {
   1034   TransportInfos tinfos;
   1035   for (ContentInfos::const_iterator content = contents.begin();
   1036        content != contents.end(); ++content) {
   1037     tinfos.push_back(TransportInfo(content->name,
   1038                                    TransportDescription(transport_type(),
   1039                                                         std::string(),
   1040                                                         std::string())));
   1041   }
   1042   return tinfos;
   1043 }
   1044 
   1045 bool Session::OnRemoteCandidates(
   1046     const TransportInfos& tinfos, ParseError* error) {
   1047   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1048        tinfo != tinfos.end(); ++tinfo) {
   1049     std::string str_error;
   1050     if (!BaseSession::OnRemoteCandidates(
   1051         tinfo->content_name, tinfo->description.candidates, &str_error)) {
   1052       return BadParse(str_error, error);
   1053     }
   1054   }
   1055   return true;
   1056 }
   1057 
   1058 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
   1059                                      SessionError* error) {
   1060   for (TransportInfos::const_iterator tinfo = tinfos.begin();
   1061        tinfo != tinfos.end(); ++tinfo) {
   1062     if (tinfo->description.transport_type != transport_type()) {
   1063       error->SetText("No supported transport in offer.");
   1064       return false;
   1065     }
   1066 
   1067     GetOrCreateTransportProxy(tinfo->content_name);
   1068   }
   1069   return true;
   1070 }
   1071 
   1072 TransportParserMap Session::GetTransportParsers() {
   1073   TransportParserMap parsers;
   1074   parsers[transport_type()] = transport_parser_;
   1075   return parsers;
   1076 }
   1077 
   1078 CandidateTranslatorMap Session::GetCandidateTranslators() {
   1079   CandidateTranslatorMap translators;
   1080   // NOTE: This technique makes it impossible to parse G-ICE
   1081   // candidates in session-initiate messages because the channels
   1082   // aren't yet created at that point.  Since we don't use candidates
   1083   // in session-initiate messages, we should be OK.  Once we switch to
   1084   // ICE, this translation shouldn't be necessary.
   1085   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1086        iter != transport_proxies().end(); ++iter) {
   1087     translators[iter->first] = iter->second;
   1088   }
   1089   return translators;
   1090 }
   1091 
   1092 ContentParserMap Session::GetContentParsers() {
   1093   ContentParserMap parsers;
   1094   parsers[content_type()] = client_;
   1095   // We need to be able parse both RTP-based and SCTP-based Jingle
   1096   // with the same client.
   1097   if (content_type() == NS_JINGLE_RTP) {
   1098     parsers[NS_JINGLE_DRAFT_SCTP] = client_;
   1099   }
   1100   return parsers;
   1101 }
   1102 
   1103 void Session::OnTransportRequestSignaling(Transport* transport) {
   1104   ASSERT(signaling_thread()->IsCurrent());
   1105   TransportProxy* transproxy = GetTransportProxy(transport);
   1106   ASSERT(transproxy != NULL);
   1107   if (transproxy) {
   1108     // Reset candidate allocation status for the transport proxy.
   1109     transproxy->set_candidates_allocated(false);
   1110   }
   1111   SignalRequestSignaling(this);
   1112 }
   1113 
   1114 void Session::OnTransportConnecting(Transport* transport) {
   1115   // This is an indication that we should begin watching the writability
   1116   // state of the transport.
   1117   OnTransportWritable(transport);
   1118 }
   1119 
   1120 void Session::OnTransportWritable(Transport* transport) {
   1121   ASSERT(signaling_thread()->IsCurrent());
   1122 
   1123   // If the transport is not writable, start a timer to make sure that it
   1124   // becomes writable within a reasonable amount of time.  If it does not, we
   1125   // terminate since we can't actually send data.  If the transport is writable,
   1126   // cancel the timer.  Note that writability transitions may occur repeatedly
   1127   // during the lifetime of the session.
   1128   signaling_thread()->Clear(this, MSG_TIMEOUT);
   1129   if (transport->HasChannels() && !transport->writable()) {
   1130     signaling_thread()->PostDelayed(
   1131         session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
   1132   }
   1133 }
   1134 
   1135 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
   1136                                               const Candidates& candidates) {
   1137   ASSERT(signaling_thread()->IsCurrent());
   1138   if (transproxy != NULL) {
   1139     if (initiator() && !initiate_acked_) {
   1140       // TODO: This is to work around server re-ordering
   1141       // messages.  We send the candidates once the session-initiate
   1142       // is acked.  Once we have fixed the server to guarantee message
   1143       // order, we can remove this case.
   1144       transproxy->AddUnsentCandidates(candidates);
   1145     } else {
   1146       if (!transproxy->negotiated()) {
   1147         transproxy->AddSentCandidates(candidates);
   1148       }
   1149       SessionError error;
   1150       if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
   1151         LOG(LS_ERROR) << "Could not send transport info message: "
   1152                       << error.text;
   1153         return;
   1154       }
   1155     }
   1156   }
   1157 }
   1158 
   1159 void Session::OnTransportSendError(Transport* transport,
   1160                                    const buzz::XmlElement* stanza,
   1161                                    const buzz::QName& name,
   1162                                    const std::string& type,
   1163                                    const std::string& text,
   1164                                    const buzz::XmlElement* extra_info) {
   1165   ASSERT(signaling_thread()->IsCurrent());
   1166   SignalErrorMessage(this, stanza, name, type, text, extra_info);
   1167 }
   1168 
   1169 void Session::OnIncomingMessage(const SessionMessage& msg) {
   1170   ASSERT(signaling_thread()->IsCurrent());
   1171   ASSERT(state() == STATE_INIT || msg.from == remote_name());
   1172 
   1173   if (current_protocol_== PROTOCOL_HYBRID) {
   1174     if (msg.protocol == PROTOCOL_GINGLE) {
   1175       current_protocol_ = PROTOCOL_GINGLE;
   1176     } else {
   1177       current_protocol_ = PROTOCOL_JINGLE;
   1178     }
   1179   }
   1180 
   1181   bool valid = false;
   1182   MessageError error;
   1183   switch (msg.type) {
   1184     case ACTION_SESSION_INITIATE:
   1185       valid = OnInitiateMessage(msg, &error);
   1186       break;
   1187     case ACTION_SESSION_INFO:
   1188       valid = OnInfoMessage(msg);
   1189       break;
   1190     case ACTION_SESSION_ACCEPT:
   1191       valid = OnAcceptMessage(msg, &error);
   1192       break;
   1193     case ACTION_SESSION_REJECT:
   1194       valid = OnRejectMessage(msg, &error);
   1195       break;
   1196     case ACTION_SESSION_TERMINATE:
   1197       valid = OnTerminateMessage(msg, &error);
   1198       break;
   1199     case ACTION_TRANSPORT_INFO:
   1200       valid = OnTransportInfoMessage(msg, &error);
   1201       break;
   1202     case ACTION_TRANSPORT_ACCEPT:
   1203       valid = OnTransportAcceptMessage(msg, &error);
   1204       break;
   1205     case ACTION_DESCRIPTION_INFO:
   1206       valid = OnDescriptionInfoMessage(msg, &error);
   1207       break;
   1208     default:
   1209       valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
   1210                          "unknown session message type",
   1211                          &error);
   1212   }
   1213 
   1214   if (valid) {
   1215     SendAcknowledgementMessage(msg.stanza);
   1216   } else {
   1217     SignalErrorMessage(this, msg.stanza, error.type,
   1218                        "modify", error.text, NULL);
   1219   }
   1220 }
   1221 
   1222 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
   1223                                  const buzz::XmlElement* response_stanza,
   1224                                  const SessionMessage& msg) {
   1225   ASSERT(signaling_thread()->IsCurrent());
   1226 
   1227   if (msg.type == ACTION_SESSION_INITIATE) {
   1228     OnInitiateAcked();
   1229   }
   1230 }
   1231 
   1232 void Session::OnInitiateAcked() {
   1233     // TODO: This is to work around server re-ordering
   1234     // messages.  We send the candidates once the session-initiate
   1235     // is acked.  Once we have fixed the server to guarantee message
   1236     // order, we can remove this case.
   1237   if (!initiate_acked_) {
   1238     initiate_acked_ = true;
   1239     SessionError error;
   1240     SendAllUnsentTransportInfoMessages(&error);
   1241   }
   1242 }
   1243 
   1244 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
   1245                            const buzz::XmlElement* error_stanza) {
   1246   ASSERT(signaling_thread()->IsCurrent());
   1247 
   1248   SessionMessage msg;
   1249   ParseError parse_error;
   1250   if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
   1251     LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
   1252                   << ":" << orig_stanza;
   1253     return;
   1254   }
   1255 
   1256   // If the error is a session redirect, call OnRedirectError, which will
   1257   // continue the session with a new remote JID.
   1258   SessionRedirect redirect;
   1259   if (FindSessionRedirect(error_stanza, &redirect)) {
   1260     SessionError error;
   1261     if (!OnRedirectError(redirect, &error)) {
   1262       // TODO: Should we send a message back?  The standard
   1263       // says nothing about it.
   1264       LOG(LS_ERROR) << "Failed to redirect: " << error.text;
   1265       SetError(ERROR_RESPONSE);
   1266     }
   1267     return;
   1268   }
   1269 
   1270   std::string error_type = "cancel";
   1271 
   1272   const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
   1273   if (error) {
   1274     error_type = error->Attr(buzz::QN_TYPE);
   1275 
   1276     LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
   1277                   << "in response to:\n" << orig_stanza->Str();
   1278   } else {
   1279     // don't crash if <error> is missing
   1280     LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
   1281     return;
   1282   }
   1283 
   1284   if (msg.type == ACTION_TRANSPORT_INFO) {
   1285     // Transport messages frequently generate errors because they are sent right
   1286     // when we detect a network failure.  For that reason, we ignore such
   1287     // errors, because if we do not establish writability again, we will
   1288     // terminate anyway.  The exceptions are transport-specific error tags,
   1289     // which we pass on to the respective transport.
   1290   } else if ((error_type != "continue") && (error_type != "wait")) {
   1291     // We do not set an error if the other side said it is okay to continue
   1292     // (possibly after waiting).  These errors can be ignored.
   1293     SetError(ERROR_RESPONSE);
   1294   }
   1295 }
   1296 
   1297 bool Session::OnInitiateMessage(const SessionMessage& msg,
   1298                                 MessageError* error) {
   1299   if (!CheckState(STATE_INIT, error))
   1300     return false;
   1301 
   1302   SessionInitiate init;
   1303   if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
   1304                             GetContentParsers(), GetTransportParsers(),
   1305                             GetCandidateTranslators(),
   1306                             &init, error))
   1307     return false;
   1308 
   1309   SessionError session_error;
   1310   if (!CreateTransportProxies(init.transports, &session_error)) {
   1311     return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
   1312                       session_error.text, error);
   1313   }
   1314 
   1315   set_remote_name(msg.from);
   1316   set_initiator_name(msg.initiator);
   1317   set_remote_description(new SessionDescription(init.ClearContents(),
   1318                                                 init.transports,
   1319                                                 init.groups));
   1320   // Updating transport with TransportDescription.
   1321   PushdownTransportDescription(CS_REMOTE, CA_OFFER);
   1322   SetState(STATE_RECEIVEDINITIATE);
   1323 
   1324   // Users of Session may listen to state change and call Reject().
   1325   if (state() != STATE_SENTREJECT) {
   1326     if (!OnRemoteCandidates(init.transports, error))
   1327       return false;
   1328 
   1329     // TODO(juberti): Auto-generate and push down the local transport answer.
   1330     // This is necessary for trickling to work with RFC 5245 ICE.
   1331   }
   1332   return true;
   1333 }
   1334 
   1335 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
   1336   if (!CheckState(STATE_SENTINITIATE, error))
   1337     return false;
   1338 
   1339   SessionAccept accept;
   1340   if (!ParseSessionAccept(msg.protocol, msg.action_elem,
   1341                           GetContentParsers(), GetTransportParsers(),
   1342                           GetCandidateTranslators(),
   1343                           &accept, error)) {
   1344     return false;
   1345   }
   1346 
   1347   // If we get an accept, we can assume the initiate has been
   1348   // received, even if we haven't gotten an IQ response.
   1349   OnInitiateAcked();
   1350 
   1351   set_remote_description(new SessionDescription(accept.ClearContents(),
   1352                                                 accept.transports,
   1353                                                 accept.groups));
   1354   // Updating transport with TransportDescription.
   1355   PushdownTransportDescription(CS_REMOTE, CA_ANSWER);
   1356   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
   1357   SetState(STATE_RECEIVEDACCEPT);
   1358 
   1359   if (!OnRemoteCandidates(accept.transports, error))
   1360     return false;
   1361 
   1362   return true;
   1363 }
   1364 
   1365 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
   1366   if (!CheckState(STATE_SENTINITIATE, error))
   1367     return false;
   1368 
   1369   SetState(STATE_RECEIVEDREJECT);
   1370   return true;
   1371 }
   1372 
   1373 bool Session::OnInfoMessage(const SessionMessage& msg) {
   1374   SignalInfoMessage(this, msg.action_elem);
   1375   return true;
   1376 }
   1377 
   1378 bool Session::OnTerminateMessage(const SessionMessage& msg,
   1379                                  MessageError* error) {
   1380   SessionTerminate term;
   1381   if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
   1382     return false;
   1383 
   1384   SignalReceivedTerminateReason(this, term.reason);
   1385   if (term.debug_reason != buzz::STR_EMPTY) {
   1386     LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
   1387   }
   1388 
   1389   SetState(STATE_RECEIVEDTERMINATE);
   1390   return true;
   1391 }
   1392 
   1393 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
   1394                                      MessageError* error) {
   1395   TransportInfos tinfos;
   1396   if (!ParseTransportInfos(msg.protocol, msg.action_elem,
   1397                            initiator_description()->contents(),
   1398                            GetTransportParsers(), GetCandidateTranslators(),
   1399                            &tinfos, error))
   1400     return false;
   1401 
   1402   if (!OnRemoteCandidates(tinfos, error))
   1403     return false;
   1404 
   1405   return true;
   1406 }
   1407 
   1408 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
   1409                                        MessageError* error) {
   1410   // TODO: Currently here only for compatibility with
   1411   // Gingle 1.1 clients (notably, Google Voice).
   1412   return true;
   1413 }
   1414 
   1415 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
   1416                               MessageError* error) {
   1417   if (!CheckState(STATE_INPROGRESS, error))
   1418     return false;
   1419 
   1420   DescriptionInfo description_info;
   1421   if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
   1422                             GetContentParsers(), GetTransportParsers(),
   1423                             GetCandidateTranslators(),
   1424                             &description_info, error)) {
   1425     return false;
   1426   }
   1427 
   1428   ContentInfos& updated_contents = description_info.contents;
   1429 
   1430   // TODO: Currently, reflector sends back
   1431   // video stream updates even for an audio-only call, which causes
   1432   // this to fail.  Put this back once reflector is fixed.
   1433   //
   1434   // ContentInfos::iterator it;
   1435   // First, ensure all updates are valid before modifying remote_description_.
   1436   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1437   //   if (remote_description()->GetContentByName(it->name) == NULL) {
   1438   //     return false;
   1439   //   }
   1440   // }
   1441 
   1442   // TODO: We used to replace contents from an update, but
   1443   // that no longer works with partial updates.  We need to figure out
   1444   // a way to merge patial updates into contents.  For now, users of
   1445   // Session should listen to SignalRemoteDescriptionUpdate and handle
   1446   // updates.  They should not expect remote_description to be the
   1447   // latest value.
   1448   //
   1449   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
   1450   //     remote_description()->RemoveContentByName(it->name);
   1451   //     remote_description()->AddContent(it->name, it->type, it->description);
   1452   //   }
   1453   // }
   1454 
   1455   SignalRemoteDescriptionUpdate(this, updated_contents);
   1456   return true;
   1457 }
   1458 
   1459 bool BareJidsEqual(const std::string& name1,
   1460                    const std::string& name2) {
   1461   buzz::Jid jid1(name1);
   1462   buzz::Jid jid2(name2);
   1463 
   1464   return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
   1465 }
   1466 
   1467 bool Session::OnRedirectError(const SessionRedirect& redirect,
   1468                               SessionError* error) {
   1469   MessageError message_error;
   1470   if (!CheckState(STATE_SENTINITIATE, &message_error)) {
   1471     return BadWrite(message_error.text, error);
   1472   }
   1473 
   1474   if (!BareJidsEqual(remote_name(), redirect.target))
   1475     return BadWrite("Redirection not allowed: must be the same bare jid.",
   1476                     error);
   1477 
   1478   // When we receive a redirect, we point the session at the new JID
   1479   // and resend the candidates.
   1480   set_remote_name(redirect.target);
   1481   return (SendInitiateMessage(local_description(), error) &&
   1482           ResendAllTransportInfoMessages(error));
   1483 }
   1484 
   1485 bool Session::CheckState(State expected, MessageError* error) {
   1486   if (state() != expected) {
   1487     // The server can deliver messages out of order/repeated for various
   1488     // reasons. For example, if the server does not recive our iq response,
   1489     // it could assume that the iq it sent was lost, and will then send
   1490     // it again. Ideally, we should implement reliable messaging with
   1491     // duplicate elimination.
   1492     return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
   1493                       "message not allowed in current state",
   1494                       error);
   1495   }
   1496   return true;
   1497 }
   1498 
   1499 void Session::SetError(Error error) {
   1500   BaseSession::SetError(error);
   1501   if (error != ERROR_NONE)
   1502     signaling_thread()->Post(this, MSG_ERROR);
   1503 }
   1504 
   1505 void Session::OnMessage(talk_base::Message* pmsg) {
   1506   // preserve this because BaseSession::OnMessage may modify it
   1507   State orig_state = state();
   1508 
   1509   BaseSession::OnMessage(pmsg);
   1510 
   1511   switch (pmsg->message_id) {
   1512   case MSG_ERROR:
   1513     TerminateWithReason(STR_TERMINATE_ERROR);
   1514     break;
   1515 
   1516   case MSG_STATE:
   1517     switch (orig_state) {
   1518     case STATE_SENTREJECT:
   1519     case STATE_RECEIVEDREJECT:
   1520       // Assume clean termination.
   1521       Terminate();
   1522       break;
   1523 
   1524     case STATE_SENTTERMINATE:
   1525     case STATE_RECEIVEDTERMINATE:
   1526       session_manager_->DestroySession(this);
   1527       break;
   1528 
   1529     default:
   1530       // Explicitly ignoring some states here.
   1531       break;
   1532     }
   1533     break;
   1534   }
   1535 }
   1536 
   1537 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
   1538                                   SessionError* error) {
   1539   SessionInitiate init;
   1540   init.contents = sdesc->contents();
   1541   init.transports = GetEmptyTransportInfos(init.contents);
   1542   init.groups = sdesc->groups();
   1543   return SendMessage(ACTION_SESSION_INITIATE, init, error);
   1544 }
   1545 
   1546 bool Session::WriteSessionAction(
   1547     SignalingProtocol protocol, const SessionInitiate& init,
   1548     XmlElements* elems, WriteError* error) {
   1549   return WriteSessionInitiate(protocol, init.contents, init.transports,
   1550                               GetContentParsers(), GetTransportParsers(),
   1551                               GetCandidateTranslators(), init.groups,
   1552                               elems, error);
   1553 }
   1554 
   1555 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
   1556                                 SessionError* error) {
   1557   XmlElements elems;
   1558   if (!WriteSessionAccept(current_protocol_,
   1559                           sdesc->contents(),
   1560                           GetEmptyTransportInfos(sdesc->contents()),
   1561                           GetContentParsers(), GetTransportParsers(),
   1562                           GetCandidateTranslators(), sdesc->groups(),
   1563                           &elems, error)) {
   1564     return false;
   1565   }
   1566   return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
   1567 }
   1568 
   1569 bool Session::SendRejectMessage(const std::string& reason,
   1570                                 SessionError* error) {
   1571   SessionTerminate term(reason);
   1572   return SendMessage(ACTION_SESSION_REJECT, term, error);
   1573 }
   1574 
   1575 bool Session::SendTerminateMessage(const std::string& reason,
   1576                                    SessionError* error) {
   1577   SessionTerminate term(reason);
   1578   return SendMessage(ACTION_SESSION_TERMINATE, term, error);
   1579 }
   1580 
   1581 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1582                                  const SessionTerminate& term,
   1583                                  XmlElements* elems, WriteError* error) {
   1584   WriteSessionTerminate(protocol, term, elems);
   1585   return true;
   1586 }
   1587 
   1588 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
   1589                                        SessionError* error) {
   1590   return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
   1591 }
   1592 
   1593 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
   1594                                        const Candidates& candidates,
   1595                                        SessionError* error) {
   1596   return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
   1597       TransportDescription(transproxy->type(), std::vector<std::string>(),
   1598                            std::string(), std::string(), ICEMODE_FULL,
   1599                            CONNECTIONROLE_NONE, NULL, candidates)), error);
   1600 }
   1601 
   1602 bool Session::WriteSessionAction(SignalingProtocol protocol,
   1603                                  const TransportInfo& tinfo,
   1604                                  XmlElements* elems, WriteError* error) {
   1605   TransportInfos tinfos;
   1606   tinfos.push_back(tinfo);
   1607   return WriteTransportInfos(protocol, tinfos,
   1608                              GetTransportParsers(), GetCandidateTranslators(),
   1609                              elems, error);
   1610 }
   1611 
   1612 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
   1613   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1614        iter != transport_proxies().end(); ++iter) {
   1615     TransportProxy* transproxy = iter->second;
   1616     if (transproxy->sent_candidates().size() > 0) {
   1617       if (!SendTransportInfoMessage(
   1618               transproxy, transproxy->sent_candidates(), error)) {
   1619         LOG(LS_ERROR) << "Could not resend transport info messages: "
   1620                       << error->text;
   1621         return false;
   1622       }
   1623       transproxy->ClearSentCandidates();
   1624     }
   1625   }
   1626   return true;
   1627 }
   1628 
   1629 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
   1630   for (TransportMap::const_iterator iter = transport_proxies().begin();
   1631        iter != transport_proxies().end(); ++iter) {
   1632     TransportProxy* transproxy = iter->second;
   1633     if (transproxy->unsent_candidates().size() > 0) {
   1634       if (!SendTransportInfoMessage(
   1635               transproxy, transproxy->unsent_candidates(), error)) {
   1636         LOG(LS_ERROR) << "Could not send unsent transport info messages: "
   1637                       << error->text;
   1638         return false;
   1639       }
   1640       transproxy->ClearUnsentCandidates();
   1641     }
   1642   }
   1643   return true;
   1644 }
   1645 
   1646 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1647                           SessionError* error) {
   1648     return SendMessage(type, action_elems, remote_name(), error);
   1649 }
   1650 
   1651 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
   1652                           const std::string& remote_name, SessionError* error) {
   1653   talk_base::scoped_ptr<buzz::XmlElement> stanza(
   1654       new buzz::XmlElement(buzz::QN_IQ));
   1655 
   1656   SessionMessage msg(current_protocol_, type, id(), initiator_name());
   1657   msg.to = remote_name;
   1658   WriteSessionMessage(msg, action_elems, stanza.get());
   1659 
   1660   SignalOutgoingMessage(this, stanza.get());
   1661   return true;
   1662 }
   1663 
   1664 template <typename Action>
   1665 bool Session::SendMessage(ActionType type, const Action& action,
   1666                           SessionError* error) {
   1667   talk_base::scoped_ptr<buzz::XmlElement> stanza(
   1668       new buzz::XmlElement(buzz::QN_IQ));
   1669   if (!WriteActionMessage(type, action, stanza.get(), error))
   1670     return false;
   1671 
   1672   SignalOutgoingMessage(this, stanza.get());
   1673   return true;
   1674 }
   1675 
   1676 template <typename Action>
   1677 bool Session::WriteActionMessage(ActionType type, const Action& action,
   1678                                  buzz::XmlElement* stanza,
   1679                                  WriteError* error) {
   1680   if (current_protocol_ == PROTOCOL_HYBRID) {
   1681     if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
   1682       return false;
   1683     if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
   1684       return false;
   1685   } else {
   1686     if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
   1687       return false;
   1688   }
   1689   return true;
   1690 }
   1691 
   1692 template <typename Action>
   1693 bool Session::WriteActionMessage(SignalingProtocol protocol,
   1694                                  ActionType type, const Action& action,
   1695                                  buzz::XmlElement* stanza, WriteError* error) {
   1696   XmlElements action_elems;
   1697   if (!WriteSessionAction(protocol, action, &action_elems, error))
   1698     return false;
   1699 
   1700   SessionMessage msg(protocol, type, id(), initiator_name());
   1701   msg.to = remote_name();
   1702 
   1703   WriteSessionMessage(msg, action_elems, stanza);
   1704   return true;
   1705 }
   1706 
   1707 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
   1708   talk_base::scoped_ptr<buzz::XmlElement> ack(
   1709       new buzz::XmlElement(buzz::QN_IQ));
   1710   ack->SetAttr(buzz::QN_TO, remote_name());
   1711   ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
   1712   ack->SetAttr(buzz::QN_TYPE, "result");
   1713 
   1714   SignalOutgoingMessage(this, ack.get());
   1715 }
   1716 
   1717 }  // namespace cricket
   1718