Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/transport.h"
     29 
     30 #include "talk/base/common.h"
     31 #include "talk/base/logging.h"
     32 #include "talk/p2p/base/candidate.h"
     33 #include "talk/p2p/base/constants.h"
     34 #include "talk/p2p/base/sessionmanager.h"
     35 #include "talk/p2p/base/parsing.h"
     36 #include "talk/p2p/base/transportchannelimpl.h"
     37 #include "talk/xmllite/xmlelement.h"
     38 #include "talk/xmpp/constants.h"
     39 
     40 namespace cricket {
     41 
     42 struct ChannelParams {
     43   ChannelParams() : channel(NULL), candidate(NULL) {}
     44   explicit ChannelParams(const std::string& name)
     45       : name(name), channel(NULL), candidate(NULL) {}
     46   ChannelParams(const std::string& name,
     47                 const std::string& content_type)
     48       : name(name), content_type(content_type),
     49         channel(NULL), candidate(NULL) {}
     50   explicit ChannelParams(cricket::Candidate* candidate) :
     51       channel(NULL), candidate(candidate) {
     52     name = candidate->name();
     53   }
     54 
     55   ~ChannelParams() {
     56     delete candidate;
     57   }
     58 
     59   std::string name;
     60   std::string content_type;
     61   cricket::TransportChannelImpl* channel;
     62   cricket::Candidate* candidate;
     63 };
     64 typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
     65 
     66 enum {
     67   MSG_CREATECHANNEL = 1,
     68   MSG_DESTROYCHANNEL = 2,
     69   MSG_DESTROYALLCHANNELS = 3,
     70   MSG_CONNECTCHANNELS = 4,
     71   MSG_RESETCHANNELS = 5,
     72   MSG_ONSIGNALINGREADY = 6,
     73   MSG_ONREMOTECANDIDATE = 7,
     74   MSG_READSTATE = 8,
     75   MSG_WRITESTATE = 9,
     76   MSG_REQUESTSIGNALING = 10,
     77   MSG_ONCHANNELCANDIDATEREADY = 11,
     78   MSG_CONNECTING = 12,
     79 };
     80 
     81 Transport::Transport(talk_base::Thread* signaling_thread,
     82                      talk_base::Thread* worker_thread,
     83                      const std::string& type,
     84                      PortAllocator* allocator)
     85   : signaling_thread_(signaling_thread),
     86     worker_thread_(worker_thread), type_(type), allocator_(allocator),
     87     destroyed_(false), readable_(false), writable_(false),
     88     connect_requested_(false), allow_local_ips_(false) {
     89 }
     90 
     91 Transport::~Transport() {
     92   ASSERT(signaling_thread_->IsCurrent());
     93   ASSERT(destroyed_);
     94 }
     95 
     96 TransportChannelImpl* Transport::CreateChannel(
     97     const std::string& name, const std::string& content_type) {
     98   ChannelParams params(name, content_type);
     99   ChannelMessage msg(&params);
    100   worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
    101   return msg.data()->channel;
    102 }
    103 
    104 TransportChannelImpl* Transport::CreateChannel_w(
    105     const std::string& name, const std::string& content_type) {
    106   ASSERT(worker_thread()->IsCurrent());
    107 
    108   TransportChannelImpl* impl = CreateTransportChannel(name, content_type);
    109   impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
    110   impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
    111   impl->SignalRequestSignaling.connect(
    112       this, &Transport::OnChannelRequestSignaling);
    113   impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
    114 
    115   talk_base::CritScope cs(&crit_);
    116   ASSERT(channels_.find(name) == channels_.end());
    117   channels_[name] = impl;
    118   destroyed_ = false;
    119   if (connect_requested_) {
    120     impl->Connect();
    121     if (channels_.size() == 1) {
    122       // If this is the first channel, then indicate that we have started
    123       // connecting.
    124       signaling_thread()->Post(this, MSG_CONNECTING, NULL);
    125     }
    126   }
    127   return impl;
    128 }
    129 
    130 TransportChannelImpl* Transport::GetChannel(const std::string& name) {
    131   talk_base::CritScope cs(&crit_);
    132   ChannelMap::iterator iter = channels_.find(name);
    133   return (iter != channels_.end()) ? iter->second : NULL;
    134 }
    135 
    136 bool Transport::HasChannels() {
    137   talk_base::CritScope cs(&crit_);
    138   return !channels_.empty();
    139 }
    140 
    141 void Transport::DestroyChannel(const std::string& name) {
    142   ChannelParams params(name);
    143   ChannelMessage msg(&params);
    144   worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
    145 }
    146 
    147 void Transport::DestroyChannel_w(const std::string& name) {
    148   ASSERT(worker_thread()->IsCurrent());
    149 
    150   TransportChannelImpl* impl = NULL;
    151   {
    152     talk_base::CritScope cs(&crit_);
    153     ChannelMap::iterator iter = channels_.find(name);
    154     if (iter == channels_.end())
    155       return;
    156     impl = iter->second;
    157     channels_.erase(iter);
    158   }
    159 
    160   if (connect_requested_ && channels_.empty()) {
    161     // We're no longer attempting to connect.
    162     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
    163   }
    164 
    165   if (impl) {
    166     // Check in case the deleted channel was the only non-writable channel.
    167     OnChannelWritableState(impl);
    168     DestroyTransportChannel(impl);
    169   }
    170 }
    171 
    172 void Transport::ConnectChannels() {
    173   ASSERT(signaling_thread()->IsCurrent());
    174   worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
    175 }
    176 
    177 void Transport::ConnectChannels_w() {
    178   ASSERT(worker_thread()->IsCurrent());
    179   if (connect_requested_ || channels_.empty())
    180     return;
    181   connect_requested_ = true;
    182   signaling_thread()->Post(
    183       this, MSG_ONCHANNELCANDIDATEREADY, NULL);
    184   CallChannels_w(&TransportChannelImpl::Connect);
    185   if (!channels_.empty()) {
    186     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
    187   }
    188 }
    189 
    190 void Transport::OnConnecting_s() {
    191   ASSERT(signaling_thread()->IsCurrent());
    192   SignalConnecting(this);
    193 }
    194 
    195 void Transport::DestroyAllChannels() {
    196   ASSERT(signaling_thread()->IsCurrent());
    197   worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
    198   worker_thread()->Clear(this);
    199   signaling_thread()->Clear(this);
    200   destroyed_ = true;
    201 }
    202 
    203 void Transport::DestroyAllChannels_w() {
    204   ASSERT(worker_thread()->IsCurrent());
    205   std::vector<TransportChannelImpl*> impls;
    206   {
    207     talk_base::CritScope cs(&crit_);
    208     for (ChannelMap::iterator iter = channels_.begin();
    209          iter != channels_.end();
    210          ++iter) {
    211       impls.push_back(iter->second);
    212     }
    213     channels_.clear();
    214   }
    215 
    216   for (size_t i = 0; i < impls.size(); ++i)
    217     DestroyTransportChannel(impls[i]);
    218 }
    219 
    220 void Transport::ResetChannels() {
    221   ASSERT(signaling_thread()->IsCurrent());
    222   worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
    223 }
    224 
    225 void Transport::ResetChannels_w() {
    226   ASSERT(worker_thread()->IsCurrent());
    227 
    228   // We are no longer attempting to connect
    229   connect_requested_ = false;
    230 
    231   // Clear out the old messages, they aren't relevant
    232   talk_base::CritScope cs(&crit_);
    233   ready_candidates_.clear();
    234 
    235   // Reset all of the channels
    236   CallChannels_w(&TransportChannelImpl::Reset);
    237 }
    238 
    239 void Transport::OnSignalingReady() {
    240   ASSERT(signaling_thread()->IsCurrent());
    241   if (destroyed_) return;
    242 
    243   worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
    244 
    245   // Notify the subclass.
    246   OnTransportSignalingReady();
    247 }
    248 
    249 void Transport::CallChannels_w(TransportChannelFunc func) {
    250   ASSERT(worker_thread()->IsCurrent());
    251   talk_base::CritScope cs(&crit_);
    252   for (ChannelMap::iterator iter = channels_.begin();
    253        iter != channels_.end();
    254        ++iter) {
    255     ((iter->second)->*func)();
    256   }
    257 }
    258 
    259 bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
    260   if (cand.address().IsLocalIP() && !allow_local_ips_)
    261     return BadParse("candidate has local IP address", error);
    262 
    263   // No address zero.
    264   if (cand.address().IsAny()) {
    265     return BadParse("candidate has address of zero", error);
    266   }
    267 
    268   // Disallow all ports below 1024, except for 80 and 443 on public addresses.
    269   int port = cand.address().port();
    270   if (port < 1024) {
    271     if ((port != 80) && (port != 443))
    272       return BadParse(
    273           "candidate has port below 1024, but not 80 or 443", error);
    274     if (cand.address().IsPrivateIP()) {
    275       return BadParse(
    276           "candidate has port of 80 or 443 with private IP address", error);
    277     }
    278   }
    279 
    280   return true;
    281 }
    282 
    283 void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
    284   for (std::vector<Candidate>::const_iterator iter = candidates.begin();
    285        iter != candidates.end();
    286        ++iter) {
    287     OnRemoteCandidate(*iter);
    288   }
    289 }
    290 
    291 void Transport::OnRemoteCandidate(const Candidate& candidate) {
    292   ASSERT(signaling_thread()->IsCurrent());
    293   if (destroyed_) return;
    294   if (!HasChannel(candidate.name())) {
    295     LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
    296                     << candidate.name();
    297     return;
    298   }
    299 
    300   // new candidate deleted when params is deleted
    301   ChannelParams* params = new ChannelParams(new Candidate(candidate));
    302   ChannelMessage* msg = new ChannelMessage(params);
    303   worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
    304 }
    305 
    306 void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
    307   ASSERT(worker_thread()->IsCurrent());
    308   ChannelMap::iterator iter = channels_.find(candidate.name());
    309   // It's ok for a channel to go away while this message is in transit.
    310   if (iter != channels_.end()) {
    311     iter->second->OnCandidate(candidate);
    312   }
    313 }
    314 
    315 void Transport::OnChannelReadableState(TransportChannel* channel) {
    316   ASSERT(worker_thread()->IsCurrent());
    317   signaling_thread()->Post(this, MSG_READSTATE, NULL);
    318 }
    319 
    320 void Transport::OnChannelReadableState_s() {
    321   ASSERT(signaling_thread()->IsCurrent());
    322   bool readable = GetTransportState_s(true);
    323   if (readable_ != readable) {
    324     readable_ = readable;
    325     SignalReadableState(this);
    326   }
    327 }
    328 
    329 void Transport::OnChannelWritableState(TransportChannel* channel) {
    330   ASSERT(worker_thread()->IsCurrent());
    331   signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
    332 }
    333 
    334 void Transport::OnChannelWritableState_s() {
    335   ASSERT(signaling_thread()->IsCurrent());
    336   bool writable = GetTransportState_s(false);
    337   if (writable_ != writable) {
    338     writable_ = writable;
    339     SignalWritableState(this);
    340   }
    341 }
    342 
    343 bool Transport::GetTransportState_s(bool read) {
    344   ASSERT(signaling_thread()->IsCurrent());
    345   bool result = false;
    346   talk_base::CritScope cs(&crit_);
    347   for (ChannelMap::iterator iter = channels_.begin();
    348        iter != channels_.end();
    349        ++iter) {
    350     bool b = (read ? iter->second->readable() : iter->second->writable());
    351     result = result || b;
    352   }
    353   return result;
    354 }
    355 
    356 void Transport::OnChannelRequestSignaling() {
    357   ASSERT(worker_thread()->IsCurrent());
    358   signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
    359 }
    360 
    361 void Transport::OnChannelRequestSignaling_s() {
    362   ASSERT(signaling_thread()->IsCurrent());
    363   SignalRequestSignaling(this);
    364 }
    365 
    366 void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
    367                                         const Candidate& candidate) {
    368   ASSERT(worker_thread()->IsCurrent());
    369   talk_base::CritScope cs(&crit_);
    370   ready_candidates_.push_back(candidate);
    371 
    372   // We hold any messages until the client lets us connect.
    373   if (connect_requested_) {
    374     signaling_thread()->Post(
    375         this, MSG_ONCHANNELCANDIDATEREADY, NULL);
    376   }
    377 }
    378 
    379 void Transport::OnChannelCandidateReady_s() {
    380   ASSERT(signaling_thread()->IsCurrent());
    381   ASSERT(connect_requested_);
    382 
    383   std::vector<Candidate> candidates;
    384   {
    385     talk_base::CritScope cs(&crit_);
    386     candidates.swap(ready_candidates_);
    387   }
    388 
    389   // we do the deleting of Candidate* here to keep the new above and
    390   // delete below close to each other
    391   if (!candidates.empty()) {
    392     SignalCandidatesReady(this, candidates);
    393   }
    394 }
    395 
    396 void Transport::OnMessage(talk_base::Message* msg) {
    397   switch (msg->message_id) {
    398   case MSG_CREATECHANNEL:
    399     {
    400       ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
    401       params->channel = CreateChannel_w(params->name, params->content_type);
    402     }
    403     break;
    404   case MSG_DESTROYCHANNEL:
    405     {
    406       ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
    407       DestroyChannel_w(params->name);
    408     }
    409     break;
    410   case MSG_CONNECTCHANNELS:
    411     ConnectChannels_w();
    412     break;
    413   case MSG_RESETCHANNELS:
    414     ResetChannels_w();
    415     break;
    416   case MSG_DESTROYALLCHANNELS:
    417     DestroyAllChannels_w();
    418     break;
    419   case MSG_ONSIGNALINGREADY:
    420     CallChannels_w(&TransportChannelImpl::OnSignalingReady);
    421     break;
    422   case MSG_ONREMOTECANDIDATE:
    423     {
    424       ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
    425       ChannelParams* params = channel_msg->data();
    426       OnRemoteCandidate_w(*(params->candidate));
    427       delete params;
    428       delete channel_msg;
    429     }
    430     break;
    431   case MSG_CONNECTING:
    432     OnConnecting_s();
    433     break;
    434   case MSG_READSTATE:
    435     OnChannelReadableState_s();
    436     break;
    437   case MSG_WRITESTATE:
    438     OnChannelWritableState_s();
    439     break;
    440   case MSG_REQUESTSIGNALING:
    441     OnChannelRequestSignaling_s();
    442     break;
    443   case MSG_ONCHANNELCANDIDATEREADY:
    444     OnChannelCandidateReady_s();
    445     break;
    446   }
    447 }
    448 
    449 bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
    450                                    const buzz::QName& address_name,
    451                                    const buzz::QName& port_name,
    452                                    talk_base::SocketAddress* address,
    453                                    ParseError* error) {
    454   if (!elem->HasAttr(address_name))
    455     return BadParse("address does not have " + address_name.LocalPart(), error);
    456   if (!elem->HasAttr(port_name))
    457     return BadParse("address does not have " + port_name.LocalPart(), error);
    458 
    459   address->SetIP(elem->Attr(address_name));
    460   std::istringstream ist(elem->Attr(port_name));
    461   int port = 0;
    462   ist >> port;
    463   address->SetPort(port);
    464 
    465   return true;
    466 }
    467 
    468 }  // namespace cricket
    469