Home | History | Annotate | Download | only in client
      1 /*
      2  * libjingle
      3  * Copyright 2012, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/examples/peerconnection/client/peer_connection_client.h"
     29 
     30 #include "talk/examples/peerconnection/client/defaults.h"
     31 #include "talk/base/common.h"
     32 #include "talk/base/nethelpers.h"
     33 #include "talk/base/logging.h"
     34 #include "talk/base/stringutils.h"
     35 
     36 #ifdef WIN32
     37 #include "talk/base/win32socketserver.h"
     38 #endif
     39 
     40 using talk_base::sprintfn;
     41 
     42 namespace {
     43 
     44 // This is our magical hangup signal.
     45 const char kByeMessage[] = "BYE";
     46 // Delay between server connection retries, in milliseconds
     47 const int kReconnectDelay = 2000;
     48 
     49 talk_base::AsyncSocket* CreateClientSocket(int family) {
     50 #ifdef WIN32
     51   talk_base::Win32Socket* sock = new talk_base::Win32Socket();
     52   sock->CreateT(family, SOCK_STREAM);
     53   return sock;
     54 #elif defined(POSIX)
     55   talk_base::Thread* thread = talk_base::Thread::Current();
     56   ASSERT(thread != NULL);
     57   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
     58 #else
     59 #error Platform not supported.
     60 #endif
     61 }
     62 
     63 }
     64 
     65 PeerConnectionClient::PeerConnectionClient()
     66   : callback_(NULL),
     67     resolver_(NULL),
     68     state_(NOT_CONNECTED),
     69     my_id_(-1) {
     70 }
     71 
     72 PeerConnectionClient::~PeerConnectionClient() {
     73 }
     74 
     75 void PeerConnectionClient::InitSocketSignals() {
     76   ASSERT(control_socket_.get() != NULL);
     77   ASSERT(hanging_get_.get() != NULL);
     78   control_socket_->SignalCloseEvent.connect(this,
     79       &PeerConnectionClient::OnClose);
     80   hanging_get_->SignalCloseEvent.connect(this,
     81       &PeerConnectionClient::OnClose);
     82   control_socket_->SignalConnectEvent.connect(this,
     83       &PeerConnectionClient::OnConnect);
     84   hanging_get_->SignalConnectEvent.connect(this,
     85       &PeerConnectionClient::OnHangingGetConnect);
     86   control_socket_->SignalReadEvent.connect(this,
     87       &PeerConnectionClient::OnRead);
     88   hanging_get_->SignalReadEvent.connect(this,
     89       &PeerConnectionClient::OnHangingGetRead);
     90 }
     91 
     92 int PeerConnectionClient::id() const {
     93   return my_id_;
     94 }
     95 
     96 bool PeerConnectionClient::is_connected() const {
     97   return my_id_ != -1;
     98 }
     99 
    100 const Peers& PeerConnectionClient::peers() const {
    101   return peers_;
    102 }
    103 
    104 void PeerConnectionClient::RegisterObserver(
    105     PeerConnectionClientObserver* callback) {
    106   ASSERT(!callback_);
    107   callback_ = callback;
    108 }
    109 
    110 void PeerConnectionClient::Connect(const std::string& server, int port,
    111                                    const std::string& client_name) {
    112   ASSERT(!server.empty());
    113   ASSERT(!client_name.empty());
    114 
    115   if (state_ != NOT_CONNECTED) {
    116     LOG(WARNING)
    117         << "The client must not be connected before you can call Connect()";
    118     callback_->OnServerConnectionFailure();
    119     return;
    120   }
    121 
    122   if (server.empty() || client_name.empty()) {
    123     callback_->OnServerConnectionFailure();
    124     return;
    125   }
    126 
    127   if (port <= 0)
    128     port = kDefaultServerPort;
    129 
    130   server_address_.SetIP(server);
    131   server_address_.SetPort(port);
    132   client_name_ = client_name;
    133 
    134   if (server_address_.IsUnresolved()) {
    135     state_ = RESOLVING;
    136     resolver_ = new talk_base::AsyncResolver();
    137     resolver_->SignalWorkDone.connect(this,
    138                                       &PeerConnectionClient::OnResolveResult);
    139     resolver_->set_address(server_address_);
    140     resolver_->Start();
    141   } else {
    142     DoConnect();
    143   }
    144 }
    145 
    146 void PeerConnectionClient::OnResolveResult(talk_base::SignalThread *t) {
    147   if (resolver_->error() != 0) {
    148     callback_->OnServerConnectionFailure();
    149     resolver_->Destroy(false);
    150     resolver_ = NULL;
    151     state_ = NOT_CONNECTED;
    152   } else {
    153     server_address_ = resolver_->address();
    154     DoConnect();
    155   }
    156 }
    157 
    158 void PeerConnectionClient::DoConnect() {
    159   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
    160   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
    161   InitSocketSignals();
    162   char buffer[1024];
    163   sprintfn(buffer, sizeof(buffer),
    164            "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
    165   onconnect_data_ = buffer;
    166 
    167   bool ret = ConnectControlSocket();
    168   if (ret)
    169     state_ = SIGNING_IN;
    170   if (!ret) {
    171     callback_->OnServerConnectionFailure();
    172   }
    173 }
    174 
    175 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
    176   if (state_ != CONNECTED)
    177     return false;
    178 
    179   ASSERT(is_connected());
    180   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
    181   if (!is_connected() || peer_id == -1)
    182     return false;
    183 
    184   char headers[1024];
    185   sprintfn(headers, sizeof(headers),
    186       "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
    187       "Content-Length: %i\r\n"
    188       "Content-Type: text/plain\r\n"
    189       "\r\n",
    190       my_id_, peer_id, message.length());
    191   onconnect_data_ = headers;
    192   onconnect_data_ += message;
    193   return ConnectControlSocket();
    194 }
    195 
    196 bool PeerConnectionClient::SendHangUp(int peer_id) {
    197   return SendToPeer(peer_id, kByeMessage);
    198 }
    199 
    200 bool PeerConnectionClient::IsSendingMessage() {
    201   return state_ == CONNECTED &&
    202          control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
    203 }
    204 
    205 bool PeerConnectionClient::SignOut() {
    206   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
    207     return true;
    208 
    209   if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
    210     hanging_get_->Close();
    211 
    212   if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
    213     state_ = SIGNING_OUT;
    214 
    215     if (my_id_ != -1) {
    216       char buffer[1024];
    217       sprintfn(buffer, sizeof(buffer),
    218           "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
    219       onconnect_data_ = buffer;
    220       return ConnectControlSocket();
    221     } else {
    222       // Can occur if the app is closed before we finish connecting.
    223       return true;
    224     }
    225   } else {
    226     state_ = SIGNING_OUT_WAITING;
    227   }
    228 
    229   return true;
    230 }
    231 
    232 void PeerConnectionClient::Close() {
    233   control_socket_->Close();
    234   hanging_get_->Close();
    235   onconnect_data_.clear();
    236   peers_.clear();
    237   if (resolver_ != NULL) {
    238     resolver_->Destroy(false);
    239     resolver_ = NULL;
    240   }
    241   my_id_ = -1;
    242   state_ = NOT_CONNECTED;
    243 }
    244 
    245 bool PeerConnectionClient::ConnectControlSocket() {
    246   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
    247   int err = control_socket_->Connect(server_address_);
    248   if (err == SOCKET_ERROR) {
    249     Close();
    250     return false;
    251   }
    252   return true;
    253 }
    254 
    255 void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
    256   ASSERT(!onconnect_data_.empty());
    257   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
    258   ASSERT(sent == onconnect_data_.length());
    259   UNUSED(sent);
    260   onconnect_data_.clear();
    261 }
    262 
    263 void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
    264   char buffer[1024];
    265   sprintfn(buffer, sizeof(buffer),
    266            "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
    267   int len = static_cast<int>(strlen(buffer));
    268   int sent = socket->Send(buffer, len);
    269   ASSERT(sent == len);
    270   UNUSED2(sent, len);
    271 }
    272 
    273 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
    274                                              const std::string& message) {
    275   if (message.length() == (sizeof(kByeMessage) - 1) &&
    276       message.compare(kByeMessage) == 0) {
    277     callback_->OnPeerDisconnected(peer_id);
    278   } else {
    279     callback_->OnMessageFromPeer(peer_id, message);
    280   }
    281 }
    282 
    283 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
    284                                           size_t eoh,
    285                                           const char* header_pattern,
    286                                           size_t* value) {
    287   ASSERT(value != NULL);
    288   size_t found = data.find(header_pattern);
    289   if (found != std::string::npos && found < eoh) {
    290     *value = atoi(&data[found + strlen(header_pattern)]);
    291     return true;
    292   }
    293   return false;
    294 }
    295 
    296 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
    297                                           const char* header_pattern,
    298                                           std::string* value) {
    299   ASSERT(value != NULL);
    300   size_t found = data.find(header_pattern);
    301   if (found != std::string::npos && found < eoh) {
    302     size_t begin = found + strlen(header_pattern);
    303     size_t end = data.find("\r\n", begin);
    304     if (end == std::string::npos)
    305       end = eoh;
    306     value->assign(data.substr(begin, end - begin));
    307     return true;
    308   }
    309   return false;
    310 }
    311 
    312 bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
    313                                           std::string* data,
    314                                           size_t* content_length) {
    315   char buffer[0xffff];
    316   do {
    317     int bytes = socket->Recv(buffer, sizeof(buffer));
    318     if (bytes <= 0)
    319       break;
    320     data->append(buffer, bytes);
    321   } while (true);
    322 
    323   bool ret = false;
    324   size_t i = data->find("\r\n\r\n");
    325   if (i != std::string::npos) {
    326     LOG(INFO) << "Headers received";
    327     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
    328       size_t total_response_size = (i + 4) + *content_length;
    329       if (data->length() >= total_response_size) {
    330         ret = true;
    331         std::string should_close;
    332         const char kConnection[] = "\r\nConnection: ";
    333         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
    334             should_close.compare("close") == 0) {
    335           socket->Close();
    336           // Since we closed the socket, there was no notification delivered
    337           // to us.  Compensate by letting ourselves know.
    338           OnClose(socket, 0);
    339         }
    340       } else {
    341         // We haven't received everything.  Just continue to accept data.
    342       }
    343     } else {
    344       LOG(LS_ERROR) << "No content length field specified by the server.";
    345     }
    346   }
    347   return ret;
    348 }
    349 
    350 void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
    351   size_t content_length = 0;
    352   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
    353     size_t peer_id = 0, eoh = 0;
    354     bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
    355                                   &eoh);
    356     if (ok) {
    357       if (my_id_ == -1) {
    358         // First response.  Let's store our server assigned ID.
    359         ASSERT(state_ == SIGNING_IN);
    360         my_id_ = static_cast<int>(peer_id);
    361         ASSERT(my_id_ != -1);
    362 
    363         // The body of the response will be a list of already connected peers.
    364         if (content_length) {
    365           size_t pos = eoh + 4;
    366           while (pos < control_data_.size()) {
    367             size_t eol = control_data_.find('\n', pos);
    368             if (eol == std::string::npos)
    369               break;
    370             int id = 0;
    371             std::string name;
    372             bool connected;
    373             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
    374                            &connected) && id != my_id_) {
    375               peers_[id] = name;
    376               callback_->OnPeerConnected(id, name);
    377             }
    378             pos = eol + 1;
    379           }
    380         }
    381         ASSERT(is_connected());
    382         callback_->OnSignedIn();
    383       } else if (state_ == SIGNING_OUT) {
    384         Close();
    385         callback_->OnDisconnected();
    386       } else if (state_ == SIGNING_OUT_WAITING) {
    387         SignOut();
    388       }
    389     }
    390 
    391     control_data_.clear();
    392 
    393     if (state_ == SIGNING_IN) {
    394       ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
    395       state_ = CONNECTED;
    396       hanging_get_->Connect(server_address_);
    397     }
    398   }
    399 }
    400 
    401 void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
    402   LOG(INFO) << __FUNCTION__;
    403   size_t content_length = 0;
    404   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
    405     size_t peer_id = 0, eoh = 0;
    406     bool ok = ParseServerResponse(notification_data_, content_length,
    407                                   &peer_id, &eoh);
    408 
    409     if (ok) {
    410       // Store the position where the body begins.
    411       size_t pos = eoh + 4;
    412 
    413       if (my_id_ == static_cast<int>(peer_id)) {
    414         // A notification about a new member or a member that just
    415         // disconnected.
    416         int id = 0;
    417         std::string name;
    418         bool connected = false;
    419         if (ParseEntry(notification_data_.substr(pos), &name, &id,
    420                        &connected)) {
    421           if (connected) {
    422             peers_[id] = name;
    423             callback_->OnPeerConnected(id, name);
    424           } else {
    425             peers_.erase(id);
    426             callback_->OnPeerDisconnected(id);
    427           }
    428         }
    429       } else {
    430         OnMessageFromPeer(static_cast<int>(peer_id),
    431                           notification_data_.substr(pos));
    432       }
    433     }
    434 
    435     notification_data_.clear();
    436   }
    437 
    438   if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
    439       state_ == CONNECTED) {
    440     hanging_get_->Connect(server_address_);
    441   }
    442 }
    443 
    444 bool PeerConnectionClient::ParseEntry(const std::string& entry,
    445                                       std::string* name,
    446                                       int* id,
    447                                       bool* connected) {
    448   ASSERT(name != NULL);
    449   ASSERT(id != NULL);
    450   ASSERT(connected != NULL);
    451   ASSERT(!entry.empty());
    452 
    453   *connected = false;
    454   size_t separator = entry.find(',');
    455   if (separator != std::string::npos) {
    456     *id = atoi(&entry[separator + 1]);
    457     name->assign(entry.substr(0, separator));
    458     separator = entry.find(',', separator + 1);
    459     if (separator != std::string::npos) {
    460       *connected = atoi(&entry[separator + 1]) ? true : false;
    461     }
    462   }
    463   return !name->empty();
    464 }
    465 
    466 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
    467   int status = -1;
    468   size_t pos = response.find(' ');
    469   if (pos != std::string::npos)
    470     status = atoi(&response[pos + 1]);
    471   return status;
    472 }
    473 
    474 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
    475                                                size_t content_length,
    476                                                size_t* peer_id,
    477                                                size_t* eoh) {
    478   int status = GetResponseStatus(response.c_str());
    479   if (status != 200) {
    480     LOG(LS_ERROR) << "Received error from server";
    481     Close();
    482     callback_->OnDisconnected();
    483     return false;
    484   }
    485 
    486   *eoh = response.find("\r\n\r\n");
    487   ASSERT(*eoh != std::string::npos);
    488   if (*eoh == std::string::npos)
    489     return false;
    490 
    491   *peer_id = -1;
    492 
    493   // See comment in peer_channel.cc for why we use the Pragma header and
    494   // not e.g. "X-Peer-Id".
    495   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
    496 
    497   return true;
    498 }
    499 
    500 void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
    501   LOG(INFO) << __FUNCTION__;
    502 
    503   socket->Close();
    504 
    505 #ifdef WIN32
    506   if (err != WSAECONNREFUSED) {
    507 #else
    508   if (err != ECONNREFUSED) {
    509 #endif
    510     if (socket == hanging_get_.get()) {
    511       if (state_ == CONNECTED) {
    512         hanging_get_->Close();
    513         hanging_get_->Connect(server_address_);
    514       }
    515     } else {
    516       callback_->OnMessageSent(err);
    517     }
    518   } else {
    519     if (socket == control_socket_.get()) {
    520       LOG(WARNING) << "Connection refused; retrying in 2 seconds";
    521       talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
    522     } else {
    523       Close();
    524       callback_->OnDisconnected();
    525     }
    526   }
    527 }
    528 
    529 void PeerConnectionClient::OnMessage(talk_base::Message* msg) {
    530   // ignore msg; there is currently only one supported message ("retry")
    531   DoConnect();
    532 }
    533