Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/p2ptransportchannel.h"
     29 
     30 #include <set>
     31 #include "talk/base/common.h"
     32 #include "talk/base/logging.h"
     33 #include "talk/p2p/base/common.h"
     34 
     35 namespace {
     36 
     37 // messages for queuing up work for ourselves
     38 const uint32 MSG_SORT = 1;
     39 const uint32 MSG_PING = 2;
     40 const uint32 MSG_ALLOCATE = 3;
     41 
     42 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
     43 // for pinging.  When the socket is writable, we will use only 1 Kbps because
     44 // we don't want to degrade the quality on a modem.  These numbers should work
     45 // well on a 28.8K modem, which is the slowest connection on which the voice
     46 // quality is reasonable at all.
     47 static const uint32 PING_PACKET_SIZE = 60 * 8;
     48 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000;  // 480ms
     49 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000;  // 50ms
     50 
     51 // If there is a current writable connection, then we will also try hard to
     52 // make sure it is pinged at this rate.
     53 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900;  // 2*WRITABLE_DELAY - bit
     54 
     55 // The minimum improvement in RTT that justifies a switch.
     56 static const double kMinImprovement = 10;
     57 
     58 // Amount of time that we wait when *losing* writability before we try doing
     59 // another allocation.
     60 static const int kAllocateDelay = 1 * 1000;  // 1 second
     61 
     62 // We will try creating a new allocator from scratch after a delay of this
     63 // length without becoming writable (or timing out).
     64 static const int kAllocatePeriod = 20 * 1000;  // 20 seconds
     65 
     66 cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port,
     67                                          cricket::Port* origin_port) {
     68   if (!origin_port)
     69     return cricket::Port::ORIGIN_MESSAGE;
     70   else if (port == origin_port)
     71     return cricket::Port::ORIGIN_THIS_PORT;
     72   else
     73     return cricket::Port::ORIGIN_OTHER_PORT;
     74 }
     75 
     76 // Compares two connections based only on static information about them.
     77 int CompareConnectionCandidates(cricket::Connection* a,
     78                                 cricket::Connection* b) {
     79   // Combine local and remote preferences
     80   ASSERT(a->local_candidate().preference() == a->port()->preference());
     81   ASSERT(b->local_candidate().preference() == b->port()->preference());
     82   double a_pref = a->local_candidate().preference()
     83                 * a->remote_candidate().preference();
     84   double b_pref = b->local_candidate().preference()
     85                 * b->remote_candidate().preference();
     86 
     87   // Now check combined preferences. Lower values get sorted last.
     88   if (a_pref > b_pref)
     89     return 1;
     90   if (a_pref < b_pref)
     91     return -1;
     92 
     93   return 0;
     94 }
     95 
     96 // Compare two connections based on their writability and static preferences.
     97 int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
     98   // Sort based on write-state.  Better states have lower values.
     99   if (a->write_state() < b->write_state())
    100     return 1;
    101   if (a->write_state() > b->write_state())
    102     return -1;
    103 
    104   // Compare the candidate information.
    105   return CompareConnectionCandidates(a, b);
    106 }
    107 
    108 // Wraps the comparison connection into a less than operator that puts higher
    109 // priority writable connections first.
    110 class ConnectionCompare {
    111  public:
    112   bool operator()(const cricket::Connection *ca,
    113                   const cricket::Connection *cb) {
    114     cricket::Connection* a = const_cast<cricket::Connection*>(ca);
    115     cricket::Connection* b = const_cast<cricket::Connection*>(cb);
    116 
    117     // Compare first on writability and static preferences.
    118     int cmp = CompareConnections(a, b);
    119     if (cmp > 0)
    120       return true;
    121     if (cmp < 0)
    122       return false;
    123 
    124     // Otherwise, sort based on latency estimate.
    125     return a->rtt() < b->rtt();
    126 
    127     // Should we bother checking for the last connection that last received
    128     // data? It would help rendezvous on the connection that is also receiving
    129     // packets.
    130     //
    131     // TODO: Yes we should definitely do this.  The TCP protocol gains
    132     // efficiency by being used bidirectionally, as opposed to two separate
    133     // unidirectional streams.  This test should probably occur before
    134     // comparison of local prefs (assuming combined prefs are the same).  We
    135     // need to be careful though, not to bounce back and forth with both sides
    136     // trying to rendevous with the other.
    137   }
    138 };
    139 
    140 // Determines whether we should switch between two connections, based first on
    141 // static preferences and then (if those are equal) on latency estimates.
    142 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) {
    143   if (a_conn == b_conn)
    144     return false;
    145 
    146   if (!a_conn || !b_conn)  // don't think the latter should happen
    147     return true;
    148 
    149   int prefs_cmp = CompareConnections(a_conn, b_conn);
    150   if (prefs_cmp < 0)
    151     return true;
    152   if (prefs_cmp > 0)
    153     return false;
    154 
    155   return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;
    156 }
    157 
    158 }  // unnamed namespace
    159 
    160 namespace cricket {
    161 
    162 P2PTransportChannel::P2PTransportChannel(const std::string &name,
    163                                          const std::string &content_type,
    164                                          P2PTransport* transport,
    165                                          PortAllocator *allocator) :
    166     TransportChannelImpl(name, content_type),
    167     transport_(transport),
    168     allocator_(allocator),
    169     worker_thread_(talk_base::Thread::Current()),
    170     waiting_for_signaling_(false),
    171     error_(0),
    172     best_connection_(NULL),
    173     pinging_started_(false),
    174     sort_dirty_(false),
    175     was_writable_(false),
    176     was_timed_out_(true) {
    177 }
    178 
    179 P2PTransportChannel::~P2PTransportChannel() {
    180   ASSERT(worker_thread_ == talk_base::Thread::Current());
    181 
    182   for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
    183     delete allocator_sessions_[i];
    184 }
    185 
    186 // Add the allocator session to our list so that we know which sessions
    187 // are still active.
    188 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) {
    189   session->set_generation(static_cast<uint32>(allocator_sessions_.size()));
    190   allocator_sessions_.push_back(session);
    191 
    192   // We now only want to apply new candidates that we receive to the ports
    193   // created by this new session because these are replacing those of the
    194   // previous sessions.
    195   ports_.clear();
    196 
    197   session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
    198   session->SignalCandidatesReady.connect(
    199       this, &P2PTransportChannel::OnCandidatesReady);
    200   session->GetInitialPorts();
    201   if (pinging_started_)
    202     session->StartGetAllPorts();
    203 }
    204 
    205 // Go into the state of processing candidates, and running in general
    206 void P2PTransportChannel::Connect() {
    207   ASSERT(worker_thread_ == talk_base::Thread::Current());
    208 
    209   // Kick off an allocator session
    210   Allocate();
    211 
    212   // Start pinging as the ports come in.
    213   thread()->Post(this, MSG_PING);
    214 }
    215 
    216 // Reset the socket, clear up any previous allocations and start over
    217 void P2PTransportChannel::Reset() {
    218   ASSERT(worker_thread_ == talk_base::Thread::Current());
    219 
    220   // Get rid of all the old allocators.  This should clean up everything.
    221   for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
    222     delete allocator_sessions_[i];
    223 
    224   allocator_sessions_.clear();
    225   ports_.clear();
    226   connections_.clear();
    227   best_connection_ = NULL;
    228 
    229   // Forget about all of the candidates we got before.
    230   remote_candidates_.clear();
    231 
    232   // Revert to the initial state.
    233   set_readable(false);
    234   set_writable(false);
    235 
    236   // Reinitialize the rest of our state.
    237   waiting_for_signaling_ = false;
    238   pinging_started_ = false;
    239   sort_dirty_ = false;
    240   was_writable_ = false;
    241   was_timed_out_ = true;
    242 
    243   // If we allocated before, start a new one now.
    244   if (transport_->connect_requested())
    245     Allocate();
    246 
    247   // Start pinging as the ports come in.
    248   thread()->Clear(this);
    249   thread()->Post(this, MSG_PING);
    250 }
    251 
    252 // A new port is available, attempt to make connections for it
    253 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session,
    254                                       Port* port) {
    255   ASSERT(worker_thread_ == talk_base::Thread::Current());
    256 
    257   // Set in-effect options on the new port
    258   for (OptionMap::const_iterator it = options_.begin();
    259        it != options_.end();
    260        ++it) {
    261     int val = port->SetOption(it->first, it->second);
    262     if (val < 0) {
    263       LOG_J(LS_WARNING, port) << "SetOption(" << it->first
    264                               << ", " << it->second
    265                               << ") failed: " << port->GetError();
    266     }
    267   }
    268 
    269   // Remember the ports and candidates, and signal that candidates are ready.
    270   // The session will handle this, and send an initiate/accept/modify message
    271   // if one is pending.
    272 
    273   ports_.push_back(port);
    274   port->SignalUnknownAddress.connect(
    275       this, &P2PTransportChannel::OnUnknownAddress);
    276   port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed);
    277 
    278   // Attempt to create a connection from this new port to all of the remote
    279   // candidates that we were given so far.
    280 
    281   std::vector<RemoteCandidate>::iterator iter;
    282   for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
    283        ++iter)
    284     CreateConnection(port, *iter, iter->origin_port(), false);
    285 
    286   SortConnections();
    287 }
    288 
    289 // A new candidate is available, let listeners know
    290 void P2PTransportChannel::OnCandidatesReady(
    291     PortAllocatorSession *session, const std::vector<Candidate>& candidates) {
    292   for (size_t i = 0; i < candidates.size(); ++i) {
    293     SignalCandidateReady(this, candidates[i]);
    294   }
    295 }
    296 
    297 // Handle stun packets
    298 void P2PTransportChannel::OnUnknownAddress(
    299     Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg,
    300     const std::string &remote_username) {
    301   ASSERT(worker_thread_ == talk_base::Thread::Current());
    302 
    303   // Port has received a valid stun packet from an address that no Connection
    304   // is currently available for. See if the remote user name is in the remote
    305   // candidate list. If it isn't return error to the stun request.
    306 
    307   const Candidate *candidate = NULL;
    308   std::vector<RemoteCandidate>::iterator it;
    309   for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) {
    310     if ((*it).username() == remote_username) {
    311       candidate = &(*it);
    312       break;
    313     }
    314   }
    315   if (candidate == NULL) {
    316     // Don't know about this username, the request is bogus
    317     // This sometimes happens if a binding response comes in before the ACCEPT
    318     // message.  It is totally valid; the retry state machine will try again.
    319 
    320     port->SendBindingErrorResponse(stun_msg, address,
    321         STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS);
    322     delete stun_msg;
    323     return;
    324   }
    325 
    326   // Check for connectivity to this address. Create connections
    327   // to this address across all local ports. First, add this as a new remote
    328   // address
    329 
    330   Candidate new_remote_candidate = *candidate;
    331   new_remote_candidate.set_address(address);
    332   // new_remote_candidate.set_protocol(port->protocol());
    333 
    334   // This remote username exists. Now create connections using this candidate,
    335   // and resort
    336 
    337   if (CreateConnections(new_remote_candidate, port, true)) {
    338     // Send the pinger a successful stun response.
    339     port->SendBindingResponse(stun_msg, address);
    340 
    341     // Update the list of connections since we just added another.  We do this
    342     // after sending the response since it could (in principle) delete the
    343     // connection in question.
    344     SortConnections();
    345   } else {
    346     // Hopefully this won't occur, because changing a destination address
    347     // shouldn't cause a new connection to fail
    348     ASSERT(false);
    349     port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR,
    350         STUN_ERROR_REASON_SERVER_ERROR);
    351   }
    352 
    353   delete stun_msg;
    354 }
    355 
    356 void P2PTransportChannel::OnCandidate(const Candidate& candidate) {
    357   ASSERT(worker_thread_ == talk_base::Thread::Current());
    358 
    359   // Create connections to this remote candidate.
    360   CreateConnections(candidate, NULL, false);
    361 
    362   // Resort the connections list, which may have new elements.
    363   SortConnections();
    364 }
    365 
    366 // Creates connections from all of the ports that we care about to the given
    367 // remote candidate.  The return value is true if we created a connection from
    368 // the origin port.
    369 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate,
    370                                             Port* origin_port,
    371                                             bool readable) {
    372   ASSERT(worker_thread_ == talk_base::Thread::Current());
    373 
    374   // Add a new connection for this candidate to every port that allows such a
    375   // connection (i.e., if they have compatible protocols) and that does not
    376   // already have a connection to an equivalent candidate.  We must be careful
    377   // to make sure that the origin port is included, even if it was pruned,
    378   // since that may be the only port that can create this connection.
    379 
    380   bool created = false;
    381 
    382   std::vector<Port *>::reverse_iterator it;
    383   for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
    384     if (CreateConnection(*it, remote_candidate, origin_port, readable)) {
    385       if (*it == origin_port)
    386         created = true;
    387     }
    388   }
    389 
    390   if ((origin_port != NULL) &&
    391       std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
    392     if (CreateConnection(origin_port, remote_candidate, origin_port, readable))
    393       created = true;
    394   }
    395 
    396   // Remember this remote candidate so that we can add it to future ports.
    397   RememberRemoteCandidate(remote_candidate, origin_port);
    398 
    399   return created;
    400 }
    401 
    402 // Setup a connection object for the local and remote candidate combination.
    403 // And then listen to connection object for changes.
    404 bool P2PTransportChannel::CreateConnection(Port* port,
    405                                            const Candidate& remote_candidate,
    406                                            Port* origin_port,
    407                                            bool readable) {
    408   // Look for an existing connection with this remote address.  If one is not
    409   // found, then we can create a new connection for this address.
    410   Connection* connection = port->GetConnection(remote_candidate.address());
    411   if (connection != NULL) {
    412     // It is not legal to try to change any of the parameters of an existing
    413     // connection; however, the other side can send a duplicate candidate.
    414     if (!remote_candidate.IsEquivalent(connection->remote_candidate())) {
    415       LOG(INFO) << "Attempt to change a remote candidate";
    416       return false;
    417     }
    418   } else {
    419     Port::CandidateOrigin origin = GetOrigin(port, origin_port);
    420     connection = port->CreateConnection(remote_candidate, origin);
    421     if (!connection)
    422       return false;
    423 
    424     connections_.push_back(connection);
    425     connection->SignalReadPacket.connect(
    426         this, &P2PTransportChannel::OnReadPacket);
    427     connection->SignalStateChange.connect(
    428         this, &P2PTransportChannel::OnConnectionStateChange);
    429     connection->SignalDestroyed.connect(
    430         this, &P2PTransportChannel::OnConnectionDestroyed);
    431 
    432     LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", ("
    433                          << connections_.size() << " total)";
    434   }
    435 
    436   // If we are readable, it is because we are creating this in response to a
    437   // ping from the other side.  This will cause the state to become readable.
    438   if (readable)
    439     connection->ReceivedPing();
    440 
    441   return true;
    442 }
    443 
    444 // Maintain our remote candidate list, adding this new remote one.
    445 void P2PTransportChannel::RememberRemoteCandidate(
    446     const Candidate& remote_candidate, Port* origin_port) {
    447   // Remove any candidates whose generation is older than this one.  The
    448   // presence of a new generation indicates that the old ones are not useful.
    449   uint32 i = 0;
    450   while (i < remote_candidates_.size()) {
    451     if (remote_candidates_[i].generation() < remote_candidate.generation()) {
    452       LOG(INFO) << "Pruning candidate from old generation: "
    453                 << remote_candidates_[i].address().ToString();
    454       remote_candidates_.erase(remote_candidates_.begin() + i);
    455     } else {
    456       i += 1;
    457     }
    458   }
    459 
    460   // Make sure this candidate is not a duplicate.
    461   for (uint32 i = 0; i < remote_candidates_.size(); ++i) {
    462     if (remote_candidates_[i].IsEquivalent(remote_candidate)) {
    463       LOG(INFO) << "Duplicate candidate: "
    464                 << remote_candidate.address().ToString();
    465       return;
    466     }
    467   }
    468 
    469   // Try this candidate for all future ports.
    470   remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port));
    471 
    472   // We have some candidates from the other side, we are now serious about
    473   // this connection.  Let's do the StartGetAllPorts thing.
    474   if (!pinging_started_) {
    475     pinging_started_ = true;
    476     for (size_t i = 0; i < allocator_sessions_.size(); ++i) {
    477       if (!allocator_sessions_[i]->IsGettingAllPorts())
    478         allocator_sessions_[i]->StartGetAllPorts();
    479     }
    480   }
    481 }
    482 
    483 // Send data to the other side, using our best connection
    484 int P2PTransportChannel::SendPacket(const char *data, size_t len) {
    485   // This can get called on any thread that is convenient to write from!
    486   if (best_connection_ == NULL) {
    487     error_ = EWOULDBLOCK;
    488     return SOCKET_ERROR;
    489   }
    490   int sent = best_connection_->Send(data, len);
    491   if (sent <= 0) {
    492     ASSERT(sent < 0);
    493     error_ = best_connection_->GetError();
    494   }
    495   return sent;
    496 }
    497 
    498 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
    499 void P2PTransportChannel::Allocate() {
    500   CancelPendingAllocate();
    501   // Time for a new allocator, lets make sure we have a signalling channel
    502   // to communicate candidates through first.
    503   waiting_for_signaling_ = true;
    504   SignalRequestSignaling();
    505 }
    506 
    507 // Cancels the pending allocate, if any.
    508 void P2PTransportChannel::CancelPendingAllocate() {
    509   thread()->Clear(this, MSG_ALLOCATE);
    510 }
    511 
    512 // Monitor connection states
    513 void P2PTransportChannel::UpdateConnectionStates() {
    514   uint32 now = talk_base::Time();
    515 
    516   // We need to copy the list of connections since some may delete themselves
    517   // when we call UpdateState.
    518   for (uint32 i = 0; i < connections_.size(); ++i)
    519     connections_[i]->UpdateState(now);
    520 }
    521 
    522 // Prepare for best candidate sorting
    523 void P2PTransportChannel::RequestSort() {
    524   if (!sort_dirty_) {
    525     worker_thread_->Post(this, MSG_SORT);
    526     sort_dirty_ = true;
    527   }
    528 }
    529 
    530 // Sort the available connections to find the best one.  We also monitor
    531 // the number of available connections and the current state so that we
    532 // can possibly kick off more allocators (for more connections).
    533 void P2PTransportChannel::SortConnections() {
    534   ASSERT(worker_thread_ == talk_base::Thread::Current());
    535 
    536   // Make sure the connection states are up-to-date since this affects how they
    537   // will be sorted.
    538   UpdateConnectionStates();
    539 
    540   // Any changes after this point will require a re-sort.
    541   sort_dirty_ = false;
    542 
    543   // Get a list of the networks that we are using.
    544   std::set<talk_base::Network*> networks;
    545   for (uint32 i = 0; i < connections_.size(); ++i)
    546     networks.insert(connections_[i]->port()->network());
    547 
    548   // Find the best alternative connection by sorting.  It is important to note
    549   // that amongst equal preference, writable connections, this will choose the
    550   // one whose estimated latency is lowest.  So it is the only one that we
    551   // need to consider switching to.
    552 
    553   ConnectionCompare cmp;
    554   std::stable_sort(connections_.begin(), connections_.end(), cmp);
    555   Connection* top_connection = NULL;
    556   if (connections_.size() > 0)
    557     top_connection = connections_[0];
    558 
    559   // If necessary, switch to the new choice.
    560   if (ShouldSwitch(best_connection_, top_connection))
    561     SwitchBestConnectionTo(top_connection);
    562 
    563   // We can prune any connection for which there is a writable connection on
    564   // the same network with better or equal prefences.  We leave those with
    565   // better preference just in case they become writable later (at which point,
    566   // we would prune out the current best connection).  We leave connections on
    567   // other networks because they may not be using the same resources and they
    568   // may represent very distinct paths over which we can switch.
    569   std::set<talk_base::Network*>::iterator network;
    570   for (network = networks.begin(); network != networks.end(); ++network) {
    571     Connection* primier = GetBestConnectionOnNetwork(*network);
    572     if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
    573       continue;
    574 
    575     for (uint32 i = 0; i < connections_.size(); ++i) {
    576       if ((connections_[i] != primier) &&
    577           (connections_[i]->port()->network() == *network) &&
    578           (CompareConnectionCandidates(primier, connections_[i]) >= 0)) {
    579         connections_[i]->Prune();
    580       }
    581     }
    582   }
    583 
    584   // Count the number of connections in the various states.
    585 
    586   int writable = 0;
    587   int write_connect = 0;
    588   int write_timeout = 0;
    589 
    590   for (uint32 i = 0; i < connections_.size(); ++i) {
    591     switch (connections_[i]->write_state()) {
    592     case Connection::STATE_WRITABLE:
    593       ++writable;
    594       break;
    595     case Connection::STATE_WRITE_CONNECT:
    596       ++write_connect;
    597       break;
    598     case Connection::STATE_WRITE_TIMEOUT:
    599       ++write_timeout;
    600       break;
    601     default:
    602       ASSERT(false);
    603     }
    604   }
    605 
    606   if (writable > 0) {
    607     HandleWritable();
    608   } else if (write_connect > 0) {
    609     HandleNotWritable();
    610   } else {
    611     HandleAllTimedOut();
    612   }
    613 
    614   // Update the state of this channel.  This method is called whenever the
    615   // state of any connection changes, so this is a good place to do this.
    616   UpdateChannelState();
    617 
    618   // Notify of connection state change
    619   SignalConnectionMonitor(this);
    620 }
    621 
    622 // Track the best connection, and let listeners know
    623 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
    624   // Note: if conn is NULL, the previous best_connection_ has been destroyed,
    625   // so don't use it.
    626   // use it.
    627   Connection* old_best_connection = best_connection_;
    628   best_connection_ = conn;
    629   if (best_connection_) {
    630     if (old_best_connection) {
    631       LOG_J(LS_INFO, this) << "Previous best connection: "
    632                            << old_best_connection->ToString();
    633     }
    634     LOG_J(LS_INFO, this) << "New best connection: "
    635                          << best_connection_->ToString();
    636     SignalRouteChange(this, best_connection_->remote_candidate().address());
    637   } else {
    638     LOG_J(LS_INFO, this) << "No best connection";
    639   }
    640 }
    641 
    642 void P2PTransportChannel::UpdateChannelState() {
    643   // The Handle* functions already set the writable state.  We'll just double-
    644   // check it here.
    645   bool writable = ((best_connection_ != NULL)  &&
    646       (best_connection_->write_state() ==
    647       Connection::STATE_WRITABLE));
    648   ASSERT(writable == this->writable());
    649   if (writable != this->writable())
    650     LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
    651 
    652   bool readable = false;
    653   for (uint32 i = 0; i < connections_.size(); ++i) {
    654     if (connections_[i]->read_state() == Connection::STATE_READABLE)
    655       readable = true;
    656   }
    657   set_readable(readable);
    658 }
    659 
    660 // We checked the status of our connections and we had at least one that
    661 // was writable, go into the writable state.
    662 void P2PTransportChannel::HandleWritable() {
    663   //
    664   // One or more connections writable!
    665   //
    666   if (!writable()) {
    667     for (uint32 i = 0; i < allocator_sessions_.size(); ++i) {
    668       if (allocator_sessions_[i]->IsGettingAllPorts()) {
    669         allocator_sessions_[i]->StopGetAllPorts();
    670       }
    671     }
    672 
    673     // Stop further allocations.
    674     CancelPendingAllocate();
    675   }
    676 
    677   // We're writable, obviously we aren't timed out
    678   was_writable_ = true;
    679   was_timed_out_ = false;
    680   set_writable(true);
    681 }
    682 
    683 // We checked the status of our connections and we didn't have any that
    684 // were writable, go into the connecting state (kick off a new allocator
    685 // session).
    686 void P2PTransportChannel::HandleNotWritable() {
    687   //
    688   // No connections are writable but not timed out!
    689   //
    690   if (was_writable_) {
    691     // If we were writable, let's kick off an allocator session immediately
    692     was_writable_ = false;
    693     Allocate();
    694   }
    695 
    696   // We were connecting, obviously not ALL timed out.
    697   was_timed_out_ = false;
    698   set_writable(false);
    699 }
    700 
    701 // We checked the status of our connections and not only weren't they writable
    702 // but they were also timed out, we really need a new allocator.
    703 void P2PTransportChannel::HandleAllTimedOut() {
    704   //
    705   // No connections... all are timed out!
    706   //
    707   if (!was_timed_out_) {
    708     // We weren't timed out before, so kick off an allocator now (we'll still
    709     // be in the fully timed out state until the allocator actually gives back
    710     // new ports)
    711     Allocate();
    712   }
    713 
    714   // NOTE: we start was_timed_out_ in the true state so that we don't get
    715   // another allocator created WHILE we are in the process of building up
    716   // our first allocator.
    717   was_timed_out_ = true;
    718   was_writable_ = false;
    719   set_writable(false);
    720 }
    721 
    722 // If we have a best connection, return it, otherwise return top one in the
    723 // list (later we will mark it best).
    724 Connection* P2PTransportChannel::GetBestConnectionOnNetwork(
    725     talk_base::Network* network) {
    726   // If the best connection is on this network, then it wins.
    727   if (best_connection_ && (best_connection_->port()->network() == network))
    728     return best_connection_;
    729 
    730   // Otherwise, we return the top-most in sorted order.
    731   for (uint32 i = 0; i < connections_.size(); ++i) {
    732     if (connections_[i]->port()->network() == network)
    733       return connections_[i];
    734   }
    735 
    736   return NULL;
    737 }
    738 
    739 // Handle any queued up requests
    740 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) {
    741   if (pmsg->message_id == MSG_SORT)
    742     OnSort();
    743   else if (pmsg->message_id == MSG_PING)
    744     OnPing();
    745   else if (pmsg->message_id == MSG_ALLOCATE)
    746     Allocate();
    747   else
    748     ASSERT(false);
    749 }
    750 
    751 // Handle queued up sort request
    752 void P2PTransportChannel::OnSort() {
    753   // Resort the connections based on the new statistics.
    754   SortConnections();
    755 }
    756 
    757 // Handle queued up ping request
    758 void P2PTransportChannel::OnPing() {
    759   // Make sure the states of the connections are up-to-date (since this affects
    760   // which ones are pingable).
    761   UpdateConnectionStates();
    762 
    763   // Find the oldest pingable connection and have it do a ping.
    764   Connection* conn = FindNextPingableConnection();
    765   if (conn)
    766     conn->Ping(talk_base::Time());
    767 
    768   // Post ourselves a message to perform the next ping.
    769   uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY;
    770   thread()->PostDelayed(delay, this, MSG_PING);
    771 }
    772 
    773 // Is the connection in a state for us to even consider pinging the other side?
    774 bool P2PTransportChannel::IsPingable(Connection* conn) {
    775   // An unconnected connection cannot be written to at all, so pinging is out
    776   // of the question.
    777   if (!conn->connected())
    778     return false;
    779 
    780   if (writable()) {
    781     // If we are writable, then we only want to ping connections that could be
    782     // better than this one, i.e., the ones that were not pruned.
    783     return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
    784   } else {
    785     // If we are not writable, then we need to try everything that might work.
    786     // This includes both connections that do not have write timeout as well as
    787     // ones that do not have read timeout.  A connection could be readable but
    788     // be in write-timeout if we pruned it before.  Since the other side is
    789     // still pinging it, it very well might still work.
    790     return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
    791            (conn->read_state() != Connection::STATE_READ_TIMEOUT);
    792   }
    793 }
    794 
    795 // Returns the next pingable connection to ping.  This will be the oldest
    796 // pingable connection unless we have a writable connection that is past the
    797 // maximum acceptable ping delay.
    798 Connection* P2PTransportChannel::FindNextPingableConnection() {
    799   uint32 now = talk_base::Time();
    800   if (best_connection_ &&
    801       (best_connection_->write_state() == Connection::STATE_WRITABLE) &&
    802       (best_connection_->last_ping_sent()
    803        + MAX_CURRENT_WRITABLE_DELAY <= now)) {
    804     return best_connection_;
    805   }
    806 
    807   Connection* oldest_conn = NULL;
    808   uint32 oldest_time = 0xFFFFFFFF;
    809   for (uint32 i = 0; i < connections_.size(); ++i) {
    810     if (IsPingable(connections_[i])) {
    811       if (connections_[i]->last_ping_sent() < oldest_time) {
    812         oldest_time = connections_[i]->last_ping_sent();
    813         oldest_conn = connections_[i];
    814       }
    815     }
    816   }
    817   return oldest_conn;
    818 }
    819 
    820 // return the number of "pingable" connections
    821 uint32 P2PTransportChannel::NumPingableConnections() {
    822   uint32 count = 0;
    823   for (uint32 i = 0; i < connections_.size(); ++i) {
    824     if (IsPingable(connections_[i]))
    825       count += 1;
    826   }
    827   return count;
    828 }
    829 
    830 // When a connection's state changes, we need to figure out who to use as
    831 // the best connection again.  It could have become usable, or become unusable.
    832 void P2PTransportChannel::OnConnectionStateChange(Connection *connection) {
    833   ASSERT(worker_thread_ == talk_base::Thread::Current());
    834 
    835   // We have to unroll the stack before doing this because we may be changing
    836   // the state of connections while sorting.
    837   RequestSort();
    838 }
    839 
    840 // When a connection is removed, edit it out, and then update our best
    841 // connection.
    842 void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) {
    843   ASSERT(worker_thread_ == talk_base::Thread::Current());
    844 
    845   // Note: the previous best_connection_ may be destroyed by now, so don't
    846   // use it.
    847 
    848   // Remove this connection from the list.
    849   std::vector<Connection*>::iterator iter =
    850       std::find(connections_.begin(), connections_.end(), connection);
    851   ASSERT(iter != connections_.end());
    852   connections_.erase(iter);
    853 
    854   LOG_J(LS_INFO, this) << "Removed connection ("
    855     << static_cast<int>(connections_.size()) << " remaining)";
    856 
    857   // If this is currently the best connection, then we need to pick a new one.
    858   // The call to SortConnections will pick a new one.  It looks at the current
    859   // best connection in order to avoid switching between fairly similar ones.
    860   // Since this connection is no longer an option, we can just set best to NULL
    861   // and re-choose a best assuming that there was no best connection.
    862   if (best_connection_ == connection) {
    863     SwitchBestConnectionTo(NULL);
    864     RequestSort();
    865   }
    866 }
    867 
    868 // When a port is destroyed remove it from our list of ports to use for
    869 // connection attempts.
    870 void P2PTransportChannel::OnPortDestroyed(Port* port) {
    871   ASSERT(worker_thread_ == talk_base::Thread::Current());
    872 
    873   // Remove this port from the list (if we didn't drop it already).
    874   std::vector<Port*>::iterator iter =
    875       std::find(ports_.begin(), ports_.end(), port);
    876   if (iter != ports_.end())
    877     ports_.erase(iter);
    878 
    879   LOG(INFO) << "Removed port from p2p socket: "
    880             << static_cast<int>(ports_.size()) << " remaining";
    881 }
    882 
    883 // We data is available, let listeners know
    884 void P2PTransportChannel::OnReadPacket(Connection *connection,
    885                                        const char *data, size_t len) {
    886   ASSERT(worker_thread_ == talk_base::Thread::Current());
    887 
    888   // Let the client know of an incoming packet
    889 
    890   SignalReadPacket(this, data, len);
    891 }
    892 
    893 // Set options on ourselves is simply setting options on all of our available
    894 // port objects.
    895 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
    896   OptionMap::iterator it = options_.find(opt);
    897   if (it == options_.end()) {
    898     options_.insert(std::make_pair(opt, value));
    899   } else if (it->second == value) {
    900     return 0;
    901   } else {
    902     it->second = value;
    903   }
    904 
    905   for (uint32 i = 0; i < ports_.size(); ++i) {
    906     int val = ports_[i]->SetOption(opt, value);
    907     if (val < 0) {
    908       // Because this also occurs deferred, probably no point in reporting an
    909       // error
    910       LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: "
    911                    << ports_[i]->GetError();
    912     }
    913   }
    914   return 0;
    915 }
    916 
    917 // When the signalling channel is ready, we can really kick off the allocator
    918 void P2PTransportChannel::OnSignalingReady() {
    919   if (waiting_for_signaling_) {
    920     waiting_for_signaling_ = false;
    921     AddAllocatorSession(allocator_->CreateSession(name(), content_type()));
    922     thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE);
    923   }
    924 }
    925 
    926 }  // namespace cricket
    927