Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/p2p/base/port.h"
     29 
     30 #include <algorithm>
     31 #include <vector>
     32 
     33 #include "talk/base/helpers.h"
     34 #include "talk/base/logging.h"
     35 #include "talk/base/scoped_ptr.h"
     36 #include "talk/base/stringutils.h"
     37 #include "talk/p2p/base/common.h"
     38 
     39 namespace {
     40 
     41 // The length of time we wait before timing out readability on a connection.
     42 const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000;   // 30 seconds
     43 
     44 // The length of time we wait before timing out writability on a connection.
     45 const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000;  // 15 seconds
     46 
     47 // The length of time we wait before we become unwritable.
     48 const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000;  // 5 seconds
     49 
     50 // The number of pings that must fail to respond before we become unwritable.
     51 const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5;
     52 
     53 // This is the length of time that we wait for a ping response to come back.
     54 const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000;   // 5 seconds
     55 
     56 // Determines whether we have seen at least the given maximum number of
     57 // pings fail to have a response.
     58 inline bool TooManyFailures(
     59     const std::vector<uint32>& pings_since_last_response,
     60     uint32 maximum_failures,
     61     uint32 rtt_estimate,
     62     uint32 now) {
     63 
     64   // If we haven't sent that many pings, then we can't have failed that many.
     65   if (pings_since_last_response.size() < maximum_failures)
     66     return false;
     67 
     68   // Check if the window in which we would expect a response to the ping has
     69   // already elapsed.
     70   return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now;
     71 }
     72 
     73 // Determines whether we have gone too long without seeing any response.
     74 inline bool TooLongWithoutResponse(
     75     const std::vector<uint32>& pings_since_last_response,
     76     uint32 maximum_time,
     77     uint32 now) {
     78 
     79   if (pings_since_last_response.size() == 0)
     80     return false;
     81 
     82   return pings_since_last_response[0] + maximum_time < now;
     83 }
     84 
     85 // We will restrict RTT estimates (when used for determining state) to be
     86 // within a reasonable range.
     87 const uint32 MINIMUM_RTT = 100;   // 0.1 seconds
     88 const uint32 MAXIMUM_RTT = 3000;  // 3 seconds
     89 
     90 // When we don't have any RTT data, we have to pick something reasonable.  We
     91 // use a large value just in case the connection is really slow.
     92 const uint32 DEFAULT_RTT = MAXIMUM_RTT;
     93 
     94 // Computes our estimate of the RTT given the current estimate.
     95 inline uint32 ConservativeRTTEstimate(uint32 rtt) {
     96   return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt));
     97 }
     98 
     99 // Weighting of the old rtt value to new data.
    100 const int RTT_RATIO = 3;  // 3 : 1
    101 
    102 // The delay before we begin checking if this port is useless.
    103 const int kPortTimeoutDelay = 30 * 1000;  // 30 seconds
    104 
    105 const uint32 MSG_CHECKTIMEOUT = 1;
    106 const uint32 MSG_DELETE = 1;
    107 }
    108 
    109 namespace cricket {
    110 
    111 static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" };
    112 
    113 const char* ProtoToString(ProtocolType proto) {
    114   return PROTO_NAMES[proto];
    115 }
    116 
    117 bool StringToProto(const char* value, ProtocolType* proto) {
    118   for (size_t i = 0; i <= PROTO_LAST; ++i) {
    119     if (strcmp(PROTO_NAMES[i], value) == 0) {
    120       *proto = static_cast<ProtocolType>(i);
    121       return true;
    122     }
    123   }
    124   return false;
    125 }
    126 
    127 Port::Port(talk_base::Thread* thread, const std::string& type,
    128            talk_base::PacketSocketFactory* factory, talk_base::Network* network,
    129            uint32 ip, int min_port, int max_port)
    130     : thread_(thread),
    131       factory_(factory),
    132       type_(type),
    133       network_(network),
    134       ip_(ip),
    135       min_port_(min_port),
    136       max_port_(max_port),
    137       preference_(-1),
    138       lifetime_(LT_PRESTART),
    139       enable_port_packets_(false) {
    140   ASSERT(factory_ != NULL);
    141 
    142   set_username_fragment(talk_base::CreateRandomString(16));
    143   set_password(talk_base::CreateRandomString(16));
    144   LOG_J(LS_INFO, this) << "Port created";
    145 }
    146 
    147 Port::~Port() {
    148   // Delete all of the remaining connections.  We copy the list up front
    149   // because each deletion will cause it to be modified.
    150 
    151   std::vector<Connection*> list;
    152 
    153   AddressMap::iterator iter = connections_.begin();
    154   while (iter != connections_.end()) {
    155     list.push_back(iter->second);
    156     ++iter;
    157   }
    158 
    159   for (uint32 i = 0; i < list.size(); i++)
    160     delete list[i];
    161 }
    162 
    163 Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) {
    164   AddressMap::const_iterator iter = connections_.find(remote_addr);
    165   if (iter != connections_.end())
    166     return iter->second;
    167   else
    168     return NULL;
    169 }
    170 
    171 void Port::AddAddress(const talk_base::SocketAddress& address,
    172                       const std::string& protocol,
    173                       bool final) {
    174   Candidate c;
    175   c.set_name(name_);
    176   c.set_type(type_);
    177   c.set_protocol(protocol);
    178   c.set_address(address);
    179   c.set_preference(preference_);
    180   c.set_username(username_frag_);
    181   c.set_password(password_);
    182   c.set_network_name(network_->name());
    183   c.set_generation(generation_);
    184   candidates_.push_back(c);
    185 
    186   if (final)
    187     SignalAddressReady(this);
    188 }
    189 
    190 void Port::AddConnection(Connection* conn) {
    191   connections_[conn->remote_candidate().address()] = conn;
    192   conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
    193   SignalConnectionCreated(this, conn);
    194 }
    195 
    196 void Port::OnReadPacket(
    197     const char* data, size_t size, const talk_base::SocketAddress& addr) {
    198   // If the user has enabled port packets, just hand this over.
    199   if (enable_port_packets_) {
    200     SignalReadPacket(this, data, size, addr);
    201     return;
    202   }
    203 
    204   // If this is an authenticated STUN request, then signal unknown address and
    205   // send back a proper binding response.
    206   StunMessage* msg;
    207   std::string remote_username;
    208   if (!GetStunMessage(data, size, addr, &msg, &remote_username)) {
    209     LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address ("
    210                           << addr.ToString() << ")";
    211   } else if (!msg) {
    212     // STUN message handled already
    213   } else if (msg->type() == STUN_BINDING_REQUEST) {
    214     SignalUnknownAddress(this, addr, msg, remote_username);
    215   } else {
    216     // NOTE(tschmelcher): This is benign. It occurs if we pruned a
    217     // connection for this port while it had STUN requests in flight, because
    218     // we then get back responses for them, which this code correctly does not
    219     // handle.
    220     LOG_J(LS_ERROR, this) << "Received unexpected STUN message type ("
    221                           << msg->type() << ") from unknown address ("
    222                           << addr.ToString() << ")";
    223     delete msg;
    224   }
    225 }
    226 
    227 bool Port::GetStunMessage(const char* data, size_t size,
    228                           const talk_base::SocketAddress& addr,
    229                           StunMessage** out_msg, std::string* out_username) {
    230   // NOTE: This could clearly be optimized to avoid allocating any memory.
    231   //       However, at the data rates we'll be looking at on the client side,
    232   //       this probably isn't worth worrying about.
    233   ASSERT(out_msg != NULL);
    234   ASSERT(out_username != NULL);
    235   *out_msg = NULL;
    236   out_username->clear();
    237 
    238   // Parse the request message.  If the packet is not a complete and correct
    239   // STUN message, then ignore it.
    240   talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage());
    241   talk_base::ByteBuffer buf(data, size);
    242   if (!stun_msg->Read(&buf) || (buf.Length() > 0)) {
    243     return false;
    244   }
    245 
    246   // The packet must include a username that either begins or ends with our
    247   // fragment.  It should begin with our fragment if it is a request and it
    248   // should end with our fragment if it is a response.
    249   const StunByteStringAttribute* username_attr =
    250       stun_msg->GetByteString(STUN_ATTR_USERNAME);
    251 
    252   int remote_frag_len = (username_attr ? username_attr->length() : 0);
    253   remote_frag_len -= static_cast<int>(username_frag_.size());
    254 
    255   if (stun_msg->type() == STUN_BINDING_REQUEST) {
    256     if (remote_frag_len < 0) {
    257       // Username not present or corrupted, don't reply.
    258       LOG_J(LS_ERROR, this) << "Received STUN request without username from "
    259                             << addr.ToString();
    260       return true;
    261     } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(),
    262                            username_frag_.size()) != 0) {
    263       LOG_J(LS_ERROR, this) << "Received STUN request with bad local username "
    264                             << std::string(username_attr->bytes(),
    265                                            username_attr->length()) << " from "
    266                             << addr.ToString();
    267       SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST,
    268                                STUN_ERROR_REASON_BAD_REQUEST);
    269       return true;
    270     }
    271 
    272     out_username->assign(username_attr->bytes() + username_frag_.size(),
    273                          username_attr->bytes() + username_attr->length());
    274   } else if ((stun_msg->type() == STUN_BINDING_RESPONSE)
    275       || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) {
    276     if (remote_frag_len < 0) {
    277       LOG_J(LS_ERROR, this) << "Received STUN response without username from "
    278                             << addr.ToString();
    279       // Do not send error response to a response
    280       return true;
    281     } else if (std::memcmp(username_attr->bytes() + remote_frag_len,
    282                            username_frag_.c_str(),
    283                            username_frag_.size()) != 0) {
    284       LOG_J(LS_ERROR, this) << "Received STUN response with bad local username "
    285                             << std::string(username_attr->bytes(),
    286                                            username_attr->length()) << " from "
    287                             << addr.ToString();
    288       // Do not send error response to a response
    289       return true;
    290     }
    291 
    292     out_username->assign(username_attr->bytes(),
    293                          username_attr->bytes() + remote_frag_len);
    294 
    295     if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) {
    296       if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) {
    297         LOG_J(LS_ERROR, this) << "Received STUN binding error:"
    298                               << " class="
    299                               << static_cast<int>(error_code->error_class())
    300                               << " number="
    301                               << static_cast<int>(error_code->number())
    302                               << " reason='" << error_code->reason() << "'"
    303                               << " from " << addr.ToString();
    304         // Return message to allow error-specific processing
    305       } else {
    306         LOG_J(LS_ERROR, this) << "Received STUN binding error without a error "
    307                               << "code from " << addr.ToString();
    308         // Drop corrupt message
    309         return true;
    310       }
    311     }
    312   } else {
    313     LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type ("
    314                           << stun_msg->type() << ") from " << addr.ToString();
    315     return true;
    316   }
    317 
    318   // Return the STUN message found.
    319   *out_msg = stun_msg.release();
    320   return true;
    321 }
    322 
    323 void Port::SendBindingResponse(StunMessage* request,
    324                                const talk_base::SocketAddress& addr) {
    325   ASSERT(request->type() == STUN_BINDING_REQUEST);
    326 
    327   // Retrieve the username from the request.
    328   const StunByteStringAttribute* username_attr =
    329       request->GetByteString(STUN_ATTR_USERNAME);
    330   ASSERT(username_attr != NULL);
    331   if (username_attr == NULL) {
    332     // No valid username, skip the response.
    333     return;
    334   }
    335 
    336   // Fill in the response message.
    337   StunMessage response;
    338   response.SetType(STUN_BINDING_RESPONSE);
    339   response.SetTransactionID(request->transaction_id());
    340 
    341   StunByteStringAttribute* username2_attr =
    342       StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
    343   username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
    344   response.AddAttribute(username2_attr);
    345 
    346   StunAddressAttribute* addr_attr =
    347       StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS);
    348   addr_attr->SetFamily(1);
    349   addr_attr->SetPort(addr.port());
    350   addr_attr->SetIP(addr.ip());
    351   response.AddAttribute(addr_attr);
    352 
    353   // Send the response message.
    354   // NOTE: If we wanted to, this is where we would add the HMAC.
    355   talk_base::ByteBuffer buf;
    356   response.Write(&buf);
    357   if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) {
    358     LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to "
    359                           << addr.ToString();
    360   }
    361 
    362   // The fact that we received a successful request means that this connection
    363   // (if one exists) should now be readable.
    364   Connection* conn = GetConnection(addr);
    365   ASSERT(conn != NULL);
    366   if (conn)
    367     conn->ReceivedPing();
    368 }
    369 
    370 void Port::SendBindingErrorResponse(StunMessage* request,
    371                                     const talk_base::SocketAddress& addr,
    372                                     int error_code, const std::string& reason) {
    373   ASSERT(request->type() == STUN_BINDING_REQUEST);
    374 
    375   // Retrieve the username from the request. If it didn't have one, we
    376   // shouldn't be responding at all.
    377   const StunByteStringAttribute* username_attr =
    378       request->GetByteString(STUN_ATTR_USERNAME);
    379   ASSERT(username_attr != NULL);
    380   if (username_attr == NULL) {
    381     // No valid username, skip the response.
    382     return;
    383   }
    384 
    385   // Fill in the response message.
    386   StunMessage response;
    387   response.SetType(STUN_BINDING_ERROR_RESPONSE);
    388   response.SetTransactionID(request->transaction_id());
    389 
    390   StunByteStringAttribute* username2_attr =
    391       StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
    392   username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
    393   response.AddAttribute(username2_attr);
    394 
    395   StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode();
    396   error_attr->SetErrorCode(error_code);
    397   error_attr->SetReason(reason);
    398   response.AddAttribute(error_attr);
    399 
    400   // Send the response message.
    401   // NOTE: If we wanted to, this is where we would add the HMAC.
    402   talk_base::ByteBuffer buf;
    403   response.Write(&buf);
    404   SendTo(buf.Data(), buf.Length(), addr, false);
    405   LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason
    406                        << " to " << addr.ToString();
    407 }
    408 
    409 void Port::OnMessage(talk_base::Message *pmsg) {
    410   ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT);
    411   ASSERT(lifetime_ == LT_PRETIMEOUT);
    412   lifetime_ = LT_POSTTIMEOUT;
    413   CheckTimeout();
    414 }
    415 
    416 std::string Port::ToString() const {
    417   std::stringstream ss;
    418   ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]";
    419   return ss.str();
    420 }
    421 
    422 void Port::EnablePortPackets() {
    423   enable_port_packets_ = true;
    424 }
    425 
    426 void Port::Start() {
    427   // The port sticks around for a minimum lifetime, after which
    428   // we destroy it when it drops to zero connections.
    429   if (lifetime_ == LT_PRESTART) {
    430     lifetime_ = LT_PRETIMEOUT;
    431     thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT);
    432   } else {
    433     LOG_J(LS_WARNING, this) << "Port restart attempted";
    434   }
    435 }
    436 
    437 void Port::OnConnectionDestroyed(Connection* conn) {
    438   AddressMap::iterator iter =
    439       connections_.find(conn->remote_candidate().address());
    440   ASSERT(iter != connections_.end());
    441   connections_.erase(iter);
    442 
    443   CheckTimeout();
    444 }
    445 
    446 void Port::Destroy() {
    447   ASSERT(connections_.empty());
    448   LOG_J(LS_INFO, this) << "Port deleted";
    449   SignalDestroyed(this);
    450   delete this;
    451 }
    452 
    453 void Port::CheckTimeout() {
    454   // If this port has no connections, then there's no reason to keep it around.
    455   // When the connections time out (both read and write), they will delete
    456   // themselves, so if we have any connections, they are either readable or
    457   // writable (or still connecting).
    458   if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) {
    459     Destroy();
    460   }
    461 }
    462 
    463 // A ConnectionRequest is a simple STUN ping used to determine writability.
    464 class ConnectionRequest : public StunRequest {
    465  public:
    466   explicit ConnectionRequest(Connection* connection) : connection_(connection) {
    467   }
    468 
    469   virtual ~ConnectionRequest() {
    470   }
    471 
    472   virtual void Prepare(StunMessage* request) {
    473     request->SetType(STUN_BINDING_REQUEST);
    474     StunByteStringAttribute* username_attr =
    475         StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
    476     std::string username = connection_->remote_candidate().username();
    477     username.append(connection_->port()->username_fragment());
    478     username_attr->CopyBytes(username.c_str(), username.size());
    479     request->AddAttribute(username_attr);
    480   }
    481 
    482   virtual void OnResponse(StunMessage* response) {
    483     connection_->OnConnectionRequestResponse(this, response);
    484   }
    485 
    486   virtual void OnErrorResponse(StunMessage* response) {
    487     connection_->OnConnectionRequestErrorResponse(this, response);
    488   }
    489 
    490   virtual void OnTimeout() {
    491     connection_->OnConnectionRequestTimeout(this);
    492   }
    493 
    494   virtual int GetNextDelay() {
    495     // Each request is sent only once.  After a single delay , the request will
    496     // time out.
    497     timeout_ = true;
    498     return CONNECTION_RESPONSE_TIMEOUT;
    499   }
    500 
    501  private:
    502   Connection* connection_;
    503 };
    504 
    505 //
    506 // Connection
    507 //
    508 
    509 Connection::Connection(Port* port, size_t index,
    510                        const Candidate& remote_candidate)
    511   : port_(port), local_candidate_index_(index),
    512     remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT),
    513     write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false),
    514     requests_(port->thread()), rtt_(DEFAULT_RTT),
    515     last_ping_sent_(0), last_ping_received_(0), last_data_received_(0),
    516     reported_(false) {
    517   // Wire up to send stun packets
    518   requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
    519   LOG_J(LS_INFO, this) << "Connection created";
    520 }
    521 
    522 Connection::~Connection() {
    523 }
    524 
    525 const Candidate& Connection::local_candidate() const {
    526   if (local_candidate_index_ < port_->candidates().size())
    527     return port_->candidates()[local_candidate_index_];
    528   ASSERT(false);
    529   static Candidate foo;
    530   return foo;
    531 }
    532 
    533 void Connection::set_read_state(ReadState value) {
    534   ReadState old_value = read_state_;
    535   read_state_ = value;
    536   if (value != old_value) {
    537     LOG_J(LS_VERBOSE, this) << "set_read_state";
    538     SignalStateChange(this);
    539     CheckTimeout();
    540   }
    541 }
    542 
    543 void Connection::set_write_state(WriteState value) {
    544   WriteState old_value = write_state_;
    545   write_state_ = value;
    546   if (value != old_value) {
    547     LOG_J(LS_VERBOSE, this) << "set_write_state";
    548     SignalStateChange(this);
    549     CheckTimeout();
    550   }
    551 }
    552 
    553 void Connection::set_connected(bool value) {
    554   bool old_value = connected_;
    555   connected_ = value;
    556   if (value != old_value) {
    557     LOG_J(LS_VERBOSE, this) << "set_connected";
    558   }
    559 }
    560 
    561 void Connection::OnSendStunPacket(const void* data, size_t size,
    562                                   StunRequest* req) {
    563   if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) {
    564     LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id();
    565   }
    566 }
    567 
    568 void Connection::OnReadPacket(const char* data, size_t size) {
    569   StunMessage* msg;
    570   std::string remote_username;
    571   const talk_base::SocketAddress& addr(remote_candidate_.address());
    572   if (!port_->GetStunMessage(data, size, addr, &msg, &remote_username)) {
    573     // The packet did not parse as a valid STUN message
    574 
    575     // If this connection is readable, then pass along the packet.
    576     if (read_state_ == STATE_READABLE) {
    577       // readable means data from this address is acceptable
    578       // Send it on!
    579 
    580       last_data_received_ = talk_base::Time();
    581       recv_rate_tracker_.Update(size);
    582       SignalReadPacket(this, data, size);
    583 
    584       // If timed out sending writability checks, start up again
    585       if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
    586         set_write_state(STATE_WRITE_CONNECT);
    587     } else {
    588       // Not readable means the remote address hasn't sent a valid
    589       // binding request yet.
    590 
    591       LOG_J(LS_WARNING, this)
    592         << "Received non-STUN packet from an unreadable connection.";
    593     }
    594   } else if (!msg) {
    595     // The packet was STUN, but was already handled internally.
    596   } else if (remote_username != remote_candidate_.username()) {
    597     // The packet had the right local username, but the remote username was
    598     // not the right one for the remote address.
    599     if (msg->type() == STUN_BINDING_REQUEST) {
    600       LOG_J(LS_ERROR, this) << "Received STUN request with bad remote username "
    601                             << remote_username;
    602       port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST,
    603                                       STUN_ERROR_REASON_BAD_REQUEST);
    604     } else if (msg->type() == STUN_BINDING_RESPONSE ||
    605                msg->type() == STUN_BINDING_ERROR_RESPONSE) {
    606       LOG_J(LS_ERROR, this) << "Received STUN response with bad remote username"
    607                             " " << remote_username;
    608     }
    609     delete msg;
    610   } else {
    611     // The packet is STUN, with the right username.
    612     // If this is a STUN request, then update the readable bit and respond.
    613     // If this is a STUN response, then update the writable bit.
    614 
    615     switch (msg->type()) {
    616     case STUN_BINDING_REQUEST:
    617       // Incoming, validated stun request from remote peer.
    618       // This call will also set the connection readable.
    619 
    620       port_->SendBindingResponse(msg, addr);
    621 
    622       // If timed out sending writability checks, start up again
    623       if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
    624         set_write_state(STATE_WRITE_CONNECT);
    625       break;
    626 
    627     case STUN_BINDING_RESPONSE:
    628     case STUN_BINDING_ERROR_RESPONSE:
    629       // Response from remote peer. Does it match request sent?
    630       // This doesn't just check, it makes callbacks if transaction
    631       // id's match
    632       requests_.CheckResponse(msg);
    633       break;
    634 
    635     default:
    636       ASSERT(false);
    637       break;
    638     }
    639 
    640     // Done with the message; delete
    641 
    642     delete msg;
    643   }
    644 }
    645 
    646 void Connection::Prune() {
    647   if (!pruned_) {
    648     LOG_J(LS_VERBOSE, this) << "Connection pruned";
    649     pruned_ = true;
    650     requests_.Clear();
    651     set_write_state(STATE_WRITE_TIMEOUT);
    652   }
    653 }
    654 
    655 void Connection::Destroy() {
    656   LOG_J(LS_VERBOSE, this) << "Connection destroyed";
    657   set_read_state(STATE_READ_TIMEOUT);
    658   set_write_state(STATE_WRITE_TIMEOUT);
    659 }
    660 
    661 void Connection::UpdateState(uint32 now) {
    662   uint32 rtt = ConservativeRTTEstimate(rtt_);
    663 
    664   std::string pings;
    665   for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
    666     char buf[32];
    667     talk_base::sprintfn(buf, sizeof(buf), "%u",
    668         pings_since_last_response_[i]);
    669     pings.append(buf).append(" ");
    670   }
    671   LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_=" <<
    672       pings << ", rtt=" << rtt << ", now=" << now;
    673 
    674   // Check the readable state.
    675   //
    676   // Since we don't know how many pings the other side has attempted, the best
    677   // test we can do is a simple window.
    678 
    679   if ((read_state_ == STATE_READABLE) &&
    680       (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) {
    681     LOG_J(LS_INFO, this) << "Unreadable after "
    682                          << now - last_ping_received_
    683                          << " ms without a ping, rtt=" << rtt;
    684     set_read_state(STATE_READ_TIMEOUT);
    685   }
    686 
    687   // Check the writable state.  (The order of these checks is important.)
    688   //
    689   // Before becoming unwritable, we allow for a fixed number of pings to fail
    690   // (i.e., receive no response).  We also have to give the response time to
    691   // get back, so we include a conservative estimate of this.
    692   //
    693   // Before timing out writability, we give a fixed amount of time.  This is to
    694   // allow for changes in network conditions.
    695 
    696   if ((write_state_ == STATE_WRITABLE) &&
    697       TooManyFailures(pings_since_last_response_,
    698                       CONNECTION_WRITE_CONNECT_FAILURES,
    699                       rtt,
    700                       now) &&
    701       TooLongWithoutResponse(pings_since_last_response_,
    702                              CONNECTION_WRITE_CONNECT_TIMEOUT,
    703                              now)) {
    704     uint32 max_pings = CONNECTION_WRITE_CONNECT_FAILURES;
    705     LOG_J(LS_INFO, this) << "Unwritable after " << max_pings
    706                          << " ping failures and "
    707                          << now - pings_since_last_response_[0]
    708                          << " ms without a response,"
    709                          << " ms since last received ping="
    710                          << now - last_ping_received_
    711                          << " ms since last received data="
    712                          << now - last_data_received_
    713                          << " rtt=" << rtt;
    714     set_write_state(STATE_WRITE_CONNECT);
    715   }
    716 
    717   if ((write_state_ == STATE_WRITE_CONNECT) &&
    718       TooLongWithoutResponse(pings_since_last_response_,
    719                              CONNECTION_WRITE_TIMEOUT,
    720                              now)) {
    721     LOG_J(LS_INFO, this) << "Timed out after "
    722                          << now - pings_since_last_response_[0]
    723                          << " ms without a response, rtt=" << rtt;
    724     set_write_state(STATE_WRITE_TIMEOUT);
    725   }
    726 }
    727 
    728 void Connection::Ping(uint32 now) {
    729   ASSERT(connected_);
    730   last_ping_sent_ = now;
    731   pings_since_last_response_.push_back(now);
    732   ConnectionRequest *req = new ConnectionRequest(this);
    733   LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now;
    734   requests_.Send(req);
    735 }
    736 
    737 void Connection::ReceivedPing() {
    738   last_ping_received_ = talk_base::Time();
    739   set_read_state(STATE_READABLE);
    740 }
    741 
    742 std::string Connection::ToString() const {
    743   const char CONNECT_STATE_ABBREV[2] = {
    744     '-',  // not connected (false)
    745     'C',  // connected (true)
    746   };
    747   const char READ_STATE_ABBREV[2] = {
    748     'R',  // STATE_READABLE
    749     '-',  // STATE_READ_TIMEOUT
    750   };
    751   const char WRITE_STATE_ABBREV[3] = {
    752     'W',  // STATE_WRITABLE
    753     'w',  // STATE_WRITE_CONNECT
    754     '-',  // STATE_WRITE_TIMEOUT
    755   };
    756   const Candidate& local = local_candidate();
    757   const Candidate& remote = remote_candidate();
    758   std::stringstream ss;
    759   ss << "Conn[" << local.generation()
    760      << ":" << local.name() << ":" << local.type() << ":"
    761      << local.protocol() << ":" << local.address().ToString()
    762      << "->" << remote.name() << ":" << remote.type() << ":"
    763      << remote.protocol() << ":" << remote.address().ToString()
    764      << "|"
    765      << CONNECT_STATE_ABBREV[connected()]
    766      << READ_STATE_ABBREV[read_state()]
    767      << WRITE_STATE_ABBREV[write_state()]
    768      << "|";
    769   if (rtt_ < DEFAULT_RTT) {
    770     ss << rtt_ << "]";
    771   } else {
    772     ss << "-]";
    773   }
    774   return ss.str();
    775 }
    776 
    777 void Connection::OnConnectionRequestResponse(ConnectionRequest* request,
    778                                              StunMessage* response) {
    779   // We've already validated that this is a STUN binding response with
    780   // the correct local and remote username for this connection.
    781   // So if we're not already, become writable. We may be bringing a pruned
    782   // connection back to life, but if we don't really want it, we can always
    783   // prune it again.
    784   uint32 rtt = request->Elapsed();
    785   set_write_state(STATE_WRITABLE);
    786 
    787   std::string pings;
    788   for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
    789     char buf[32];
    790     talk_base::sprintfn(buf, sizeof(buf), "%u",
    791         pings_since_last_response_[i]);
    792     pings.append(buf).append(" ");
    793   }
    794 
    795   LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << request->id()
    796                           << ", pings_since_last_response_=" << pings
    797                           << ", rtt=" << rtt;
    798 
    799   pings_since_last_response_.clear();
    800   rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1);
    801 }
    802 
    803 void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request,
    804                                                   StunMessage* response) {
    805   const StunErrorCodeAttribute* error = response->GetErrorCode();
    806   uint32 error_code = error ?
    807       error->error_code() : static_cast<uint32>(STUN_ERROR_GLOBAL_FAILURE);
    808 
    809   if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE)
    810       || (error_code == STUN_ERROR_SERVER_ERROR)
    811       || (error_code == STUN_ERROR_UNAUTHORIZED)) {
    812     // Recoverable error, retry
    813   } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) {
    814     // Race failure, retry
    815   } else {
    816     // This is not a valid connection.
    817     LOG_J(LS_ERROR, this) << "Received STUN error response, code="
    818                           << error_code << "; killing connection";
    819     set_write_state(STATE_WRITE_TIMEOUT);
    820   }
    821 }
    822 
    823 void Connection::OnConnectionRequestTimeout(ConnectionRequest* request) {
    824   // Log at LS_INFO if we miss a ping on a writable connection.
    825   talk_base::LoggingSeverity sev = (write_state_ == STATE_WRITABLE) ?
    826       talk_base::LS_INFO : talk_base::LS_VERBOSE;
    827   uint32 when = talk_base::Time() - request->Elapsed();
    828   size_t failures;
    829   for (failures = 0; failures < pings_since_last_response_.size(); ++failures) {
    830     if (pings_since_last_response_[failures] > when) {
    831       break;
    832     }
    833   }
    834   LOG_JV(sev, this) << "Timing-out STUN ping " << request->id()
    835                     << " after " << request->Elapsed()
    836                     << " ms, failures=" << failures;
    837 }
    838 
    839 void Connection::CheckTimeout() {
    840   // If both read and write have timed out, then this connection can contribute
    841   // no more to p2p socket unless at some later date readability were to come
    842   // back.  However, we gave readability a long time to timeout, so at this
    843   // point, it seems fair to get rid of this connection.
    844   if ((read_state_ == STATE_READ_TIMEOUT) &&
    845       (write_state_ == STATE_WRITE_TIMEOUT)) {
    846     port_->thread()->Post(this, MSG_DELETE);
    847   }
    848 }
    849 
    850 void Connection::OnMessage(talk_base::Message *pmsg) {
    851   ASSERT(pmsg->message_id == MSG_DELETE);
    852 
    853   LOG_J(LS_INFO, this) << "Connection deleted";
    854   SignalDestroyed(this);
    855   delete this;
    856 }
    857 
    858 size_t Connection::recv_bytes_second() {
    859   return recv_rate_tracker_.units_second();
    860 }
    861 
    862 size_t Connection::recv_total_bytes() {
    863   return recv_rate_tracker_.total_units();
    864 }
    865 
    866 size_t Connection::sent_bytes_second() {
    867   return send_rate_tracker_.units_second();
    868 }
    869 
    870 size_t Connection::sent_total_bytes() {
    871   return send_rate_tracker_.total_units();
    872 }
    873 
    874 ProxyConnection::ProxyConnection(Port* port, size_t index,
    875                                  const Candidate& candidate)
    876   : Connection(port, index, candidate), error_(0) {
    877 }
    878 
    879 int ProxyConnection::Send(const void* data, size_t size) {
    880   if (write_state() != STATE_WRITABLE) {
    881     error_ = EWOULDBLOCK;
    882     return SOCKET_ERROR;
    883   }
    884   int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
    885   if (sent <= 0) {
    886     ASSERT(sent < 0);
    887     error_ = port_->GetError();
    888   } else {
    889     send_rate_tracker_.Update(sent);
    890   }
    891   return sent;
    892 }
    893 
    894 }  // namespace cricket
    895