Home | History | Annotate | Download | only in client
      1 /*
      2  *  Copyright 2012 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/examples/peerconnection/client/peer_connection_client.h"
     12 
     13 #include "webrtc/examples/peerconnection/client/defaults.h"
     14 #include "webrtc/base/common.h"
     15 #include "webrtc/base/logging.h"
     16 #include "webrtc/base/nethelpers.h"
     17 #include "webrtc/base/stringutils.h"
     18 
     19 #ifdef WIN32
     20 #include "webrtc/base/win32socketserver.h"
     21 #endif
     22 
     23 using rtc::sprintfn;
     24 
     25 namespace {
     26 
     27 // This is our magical hangup signal.
     28 const char kByeMessage[] = "BYE";
     29 // Delay between server connection retries, in milliseconds
     30 const int kReconnectDelay = 2000;
     31 
     32 rtc::AsyncSocket* CreateClientSocket(int family) {
     33 #ifdef WIN32
     34   rtc::Win32Socket* sock = new rtc::Win32Socket();
     35   sock->CreateT(family, SOCK_STREAM);
     36   return sock;
     37 #elif defined(WEBRTC_POSIX)
     38   rtc::Thread* thread = rtc::Thread::Current();
     39   ASSERT(thread != NULL);
     40   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
     41 #else
     42 #error Platform not supported.
     43 #endif
     44 }
     45 
     46 }  // namespace
     47 
     48 PeerConnectionClient::PeerConnectionClient()
     49   : callback_(NULL),
     50     resolver_(NULL),
     51     state_(NOT_CONNECTED),
     52     my_id_(-1) {
     53 }
     54 
     55 PeerConnectionClient::~PeerConnectionClient() {
     56 }
     57 
     58 void PeerConnectionClient::InitSocketSignals() {
     59   ASSERT(control_socket_.get() != NULL);
     60   ASSERT(hanging_get_.get() != NULL);
     61   control_socket_->SignalCloseEvent.connect(this,
     62       &PeerConnectionClient::OnClose);
     63   hanging_get_->SignalCloseEvent.connect(this,
     64       &PeerConnectionClient::OnClose);
     65   control_socket_->SignalConnectEvent.connect(this,
     66       &PeerConnectionClient::OnConnect);
     67   hanging_get_->SignalConnectEvent.connect(this,
     68       &PeerConnectionClient::OnHangingGetConnect);
     69   control_socket_->SignalReadEvent.connect(this,
     70       &PeerConnectionClient::OnRead);
     71   hanging_get_->SignalReadEvent.connect(this,
     72       &PeerConnectionClient::OnHangingGetRead);
     73 }
     74 
     75 int PeerConnectionClient::id() const {
     76   return my_id_;
     77 }
     78 
     79 bool PeerConnectionClient::is_connected() const {
     80   return my_id_ != -1;
     81 }
     82 
     83 const Peers& PeerConnectionClient::peers() const {
     84   return peers_;
     85 }
     86 
     87 void PeerConnectionClient::RegisterObserver(
     88     PeerConnectionClientObserver* callback) {
     89   ASSERT(!callback_);
     90   callback_ = callback;
     91 }
     92 
     93 void PeerConnectionClient::Connect(const std::string& server, int port,
     94                                    const std::string& client_name) {
     95   ASSERT(!server.empty());
     96   ASSERT(!client_name.empty());
     97 
     98   if (state_ != NOT_CONNECTED) {
     99     LOG(WARNING)
    100         << "The client must not be connected before you can call Connect()";
    101     callback_->OnServerConnectionFailure();
    102     return;
    103   }
    104 
    105   if (server.empty() || client_name.empty()) {
    106     callback_->OnServerConnectionFailure();
    107     return;
    108   }
    109 
    110   if (port <= 0)
    111     port = kDefaultServerPort;
    112 
    113   server_address_.SetIP(server);
    114   server_address_.SetPort(port);
    115   client_name_ = client_name;
    116 
    117   if (server_address_.IsUnresolvedIP()) {
    118     state_ = RESOLVING;
    119     resolver_ = new rtc::AsyncResolver();
    120     resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
    121     resolver_->Start(server_address_);
    122   } else {
    123     DoConnect();
    124   }
    125 }
    126 
    127 void PeerConnectionClient::OnResolveResult(
    128     rtc::AsyncResolverInterface* resolver) {
    129   if (resolver_->GetError() != 0) {
    130     callback_->OnServerConnectionFailure();
    131     resolver_->Destroy(false);
    132     resolver_ = NULL;
    133     state_ = NOT_CONNECTED;
    134   } else {
    135     server_address_ = resolver_->address();
    136     DoConnect();
    137   }
    138 }
    139 
    140 void PeerConnectionClient::DoConnect() {
    141   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
    142   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
    143   InitSocketSignals();
    144   char buffer[1024];
    145   sprintfn(buffer, sizeof(buffer),
    146            "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
    147   onconnect_data_ = buffer;
    148 
    149   bool ret = ConnectControlSocket();
    150   if (ret)
    151     state_ = SIGNING_IN;
    152   if (!ret) {
    153     callback_->OnServerConnectionFailure();
    154   }
    155 }
    156 
    157 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
    158   if (state_ != CONNECTED)
    159     return false;
    160 
    161   ASSERT(is_connected());
    162   ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
    163   if (!is_connected() || peer_id == -1)
    164     return false;
    165 
    166   char headers[1024];
    167   sprintfn(headers, sizeof(headers),
    168       "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
    169       "Content-Length: %i\r\n"
    170       "Content-Type: text/plain\r\n"
    171       "\r\n",
    172       my_id_, peer_id, message.length());
    173   onconnect_data_ = headers;
    174   onconnect_data_ += message;
    175   return ConnectControlSocket();
    176 }
    177 
    178 bool PeerConnectionClient::SendHangUp(int peer_id) {
    179   return SendToPeer(peer_id, kByeMessage);
    180 }
    181 
    182 bool PeerConnectionClient::IsSendingMessage() {
    183   return state_ == CONNECTED &&
    184          control_socket_->GetState() != rtc::Socket::CS_CLOSED;
    185 }
    186 
    187 bool PeerConnectionClient::SignOut() {
    188   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
    189     return true;
    190 
    191   if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
    192     hanging_get_->Close();
    193 
    194   if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
    195     state_ = SIGNING_OUT;
    196 
    197     if (my_id_ != -1) {
    198       char buffer[1024];
    199       sprintfn(buffer, sizeof(buffer),
    200           "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
    201       onconnect_data_ = buffer;
    202       return ConnectControlSocket();
    203     } else {
    204       // Can occur if the app is closed before we finish connecting.
    205       return true;
    206     }
    207   } else {
    208     state_ = SIGNING_OUT_WAITING;
    209   }
    210 
    211   return true;
    212 }
    213 
    214 void PeerConnectionClient::Close() {
    215   control_socket_->Close();
    216   hanging_get_->Close();
    217   onconnect_data_.clear();
    218   peers_.clear();
    219   if (resolver_ != NULL) {
    220     resolver_->Destroy(false);
    221     resolver_ = NULL;
    222   }
    223   my_id_ = -1;
    224   state_ = NOT_CONNECTED;
    225 }
    226 
    227 bool PeerConnectionClient::ConnectControlSocket() {
    228   ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
    229   int err = control_socket_->Connect(server_address_);
    230   if (err == SOCKET_ERROR) {
    231     Close();
    232     return false;
    233   }
    234   return true;
    235 }
    236 
    237 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
    238   ASSERT(!onconnect_data_.empty());
    239   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
    240   ASSERT(sent == onconnect_data_.length());
    241   RTC_UNUSED(sent);
    242   onconnect_data_.clear();
    243 }
    244 
    245 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
    246   char buffer[1024];
    247   sprintfn(buffer, sizeof(buffer),
    248            "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
    249   int len = static_cast<int>(strlen(buffer));
    250   int sent = socket->Send(buffer, len);
    251   ASSERT(sent == len);
    252   RTC_UNUSED2(sent, len);
    253 }
    254 
    255 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
    256                                              const std::string& message) {
    257   if (message.length() == (sizeof(kByeMessage) - 1) &&
    258       message.compare(kByeMessage) == 0) {
    259     callback_->OnPeerDisconnected(peer_id);
    260   } else {
    261     callback_->OnMessageFromPeer(peer_id, message);
    262   }
    263 }
    264 
    265 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
    266                                           size_t eoh,
    267                                           const char* header_pattern,
    268                                           size_t* value) {
    269   ASSERT(value != NULL);
    270   size_t found = data.find(header_pattern);
    271   if (found != std::string::npos && found < eoh) {
    272     *value = atoi(&data[found + strlen(header_pattern)]);
    273     return true;
    274   }
    275   return false;
    276 }
    277 
    278 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
    279                                           const char* header_pattern,
    280                                           std::string* value) {
    281   ASSERT(value != NULL);
    282   size_t found = data.find(header_pattern);
    283   if (found != std::string::npos && found < eoh) {
    284     size_t begin = found + strlen(header_pattern);
    285     size_t end = data.find("\r\n", begin);
    286     if (end == std::string::npos)
    287       end = eoh;
    288     value->assign(data.substr(begin, end - begin));
    289     return true;
    290   }
    291   return false;
    292 }
    293 
    294 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
    295                                           std::string* data,
    296                                           size_t* content_length) {
    297   char buffer[0xffff];
    298   do {
    299     int bytes = socket->Recv(buffer, sizeof(buffer));
    300     if (bytes <= 0)
    301       break;
    302     data->append(buffer, bytes);
    303   } while (true);
    304 
    305   bool ret = false;
    306   size_t i = data->find("\r\n\r\n");
    307   if (i != std::string::npos) {
    308     LOG(INFO) << "Headers received";
    309     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
    310       size_t total_response_size = (i + 4) + *content_length;
    311       if (data->length() >= total_response_size) {
    312         ret = true;
    313         std::string should_close;
    314         const char kConnection[] = "\r\nConnection: ";
    315         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
    316             should_close.compare("close") == 0) {
    317           socket->Close();
    318           // Since we closed the socket, there was no notification delivered
    319           // to us.  Compensate by letting ourselves know.
    320           OnClose(socket, 0);
    321         }
    322       } else {
    323         // We haven't received everything.  Just continue to accept data.
    324       }
    325     } else {
    326       LOG(LS_ERROR) << "No content length field specified by the server.";
    327     }
    328   }
    329   return ret;
    330 }
    331 
    332 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
    333   size_t content_length = 0;
    334   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
    335     size_t peer_id = 0, eoh = 0;
    336     bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
    337                                   &eoh);
    338     if (ok) {
    339       if (my_id_ == -1) {
    340         // First response.  Let's store our server assigned ID.
    341         ASSERT(state_ == SIGNING_IN);
    342         my_id_ = static_cast<int>(peer_id);
    343         ASSERT(my_id_ != -1);
    344 
    345         // The body of the response will be a list of already connected peers.
    346         if (content_length) {
    347           size_t pos = eoh + 4;
    348           while (pos < control_data_.size()) {
    349             size_t eol = control_data_.find('\n', pos);
    350             if (eol == std::string::npos)
    351               break;
    352             int id = 0;
    353             std::string name;
    354             bool connected;
    355             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
    356                            &connected) && id != my_id_) {
    357               peers_[id] = name;
    358               callback_->OnPeerConnected(id, name);
    359             }
    360             pos = eol + 1;
    361           }
    362         }
    363         ASSERT(is_connected());
    364         callback_->OnSignedIn();
    365       } else if (state_ == SIGNING_OUT) {
    366         Close();
    367         callback_->OnDisconnected();
    368       } else if (state_ == SIGNING_OUT_WAITING) {
    369         SignOut();
    370       }
    371     }
    372 
    373     control_data_.clear();
    374 
    375     if (state_ == SIGNING_IN) {
    376       ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
    377       state_ = CONNECTED;
    378       hanging_get_->Connect(server_address_);
    379     }
    380   }
    381 }
    382 
    383 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
    384   LOG(INFO) << __FUNCTION__;
    385   size_t content_length = 0;
    386   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
    387     size_t peer_id = 0, eoh = 0;
    388     bool ok = ParseServerResponse(notification_data_, content_length,
    389                                   &peer_id, &eoh);
    390 
    391     if (ok) {
    392       // Store the position where the body begins.
    393       size_t pos = eoh + 4;
    394 
    395       if (my_id_ == static_cast<int>(peer_id)) {
    396         // A notification about a new member or a member that just
    397         // disconnected.
    398         int id = 0;
    399         std::string name;
    400         bool connected = false;
    401         if (ParseEntry(notification_data_.substr(pos), &name, &id,
    402                        &connected)) {
    403           if (connected) {
    404             peers_[id] = name;
    405             callback_->OnPeerConnected(id, name);
    406           } else {
    407             peers_.erase(id);
    408             callback_->OnPeerDisconnected(id);
    409           }
    410         }
    411       } else {
    412         OnMessageFromPeer(static_cast<int>(peer_id),
    413                           notification_data_.substr(pos));
    414       }
    415     }
    416 
    417     notification_data_.clear();
    418   }
    419 
    420   if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
    421       state_ == CONNECTED) {
    422     hanging_get_->Connect(server_address_);
    423   }
    424 }
    425 
    426 bool PeerConnectionClient::ParseEntry(const std::string& entry,
    427                                       std::string* name,
    428                                       int* id,
    429                                       bool* connected) {
    430   ASSERT(name != NULL);
    431   ASSERT(id != NULL);
    432   ASSERT(connected != NULL);
    433   ASSERT(!entry.empty());
    434 
    435   *connected = false;
    436   size_t separator = entry.find(',');
    437   if (separator != std::string::npos) {
    438     *id = atoi(&entry[separator + 1]);
    439     name->assign(entry.substr(0, separator));
    440     separator = entry.find(',', separator + 1);
    441     if (separator != std::string::npos) {
    442       *connected = atoi(&entry[separator + 1]) ? true : false;
    443     }
    444   }
    445   return !name->empty();
    446 }
    447 
    448 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
    449   int status = -1;
    450   size_t pos = response.find(' ');
    451   if (pos != std::string::npos)
    452     status = atoi(&response[pos + 1]);
    453   return status;
    454 }
    455 
    456 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
    457                                                size_t content_length,
    458                                                size_t* peer_id,
    459                                                size_t* eoh) {
    460   int status = GetResponseStatus(response.c_str());
    461   if (status != 200) {
    462     LOG(LS_ERROR) << "Received error from server";
    463     Close();
    464     callback_->OnDisconnected();
    465     return false;
    466   }
    467 
    468   *eoh = response.find("\r\n\r\n");
    469   ASSERT(*eoh != std::string::npos);
    470   if (*eoh == std::string::npos)
    471     return false;
    472 
    473   *peer_id = -1;
    474 
    475   // See comment in peer_channel.cc for why we use the Pragma header and
    476   // not e.g. "X-Peer-Id".
    477   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
    478 
    479   return true;
    480 }
    481 
    482 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
    483   LOG(INFO) << __FUNCTION__;
    484 
    485   socket->Close();
    486 
    487 #ifdef WIN32
    488   if (err != WSAECONNREFUSED) {
    489 #else
    490   if (err != ECONNREFUSED) {
    491 #endif
    492     if (socket == hanging_get_.get()) {
    493       if (state_ == CONNECTED) {
    494         hanging_get_->Close();
    495         hanging_get_->Connect(server_address_);
    496       }
    497     } else {
    498       callback_->OnMessageSent(err);
    499     }
    500   } else {
    501     if (socket == control_socket_.get()) {
    502       LOG(WARNING) << "Connection refused; retrying in 2 seconds";
    503       rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
    504     } else {
    505       Close();
    506       callback_->OnDisconnected();
    507     }
    508   }
    509 }
    510 
    511 void PeerConnectionClient::OnMessage(rtc::Message* msg) {
    512   // ignore msg; there is currently only one supported message ("retry")
    513   DoConnect();
    514 }
    515