1 /* 2 * libjingle 3 * Copyright 2012, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/examples/peerconnection/client/peer_connection_client.h" 29 30 #include "talk/examples/peerconnection/client/defaults.h" 31 #include "talk/base/common.h" 32 #include "talk/base/nethelpers.h" 33 #include "talk/base/logging.h" 34 #include "talk/base/stringutils.h" 35 36 #ifdef WIN32 37 #include "talk/base/win32socketserver.h" 38 #endif 39 40 using talk_base::sprintfn; 41 42 namespace { 43 44 // This is our magical hangup signal. 45 const char kByeMessage[] = "BYE"; 46 // Delay between server connection retries, in milliseconds 47 const int kReconnectDelay = 2000; 48 49 talk_base::AsyncSocket* CreateClientSocket(int family) { 50 #ifdef WIN32 51 talk_base::Win32Socket* sock = new talk_base::Win32Socket(); 52 sock->CreateT(family, SOCK_STREAM); 53 return sock; 54 #elif defined(POSIX) 55 talk_base::Thread* thread = talk_base::Thread::Current(); 56 ASSERT(thread != NULL); 57 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM); 58 #else 59 #error Platform not supported. 60 #endif 61 } 62 63 } 64 65 PeerConnectionClient::PeerConnectionClient() 66 : callback_(NULL), 67 resolver_(NULL), 68 state_(NOT_CONNECTED), 69 my_id_(-1) { 70 } 71 72 PeerConnectionClient::~PeerConnectionClient() { 73 } 74 75 void PeerConnectionClient::InitSocketSignals() { 76 ASSERT(control_socket_.get() != NULL); 77 ASSERT(hanging_get_.get() != NULL); 78 control_socket_->SignalCloseEvent.connect(this, 79 &PeerConnectionClient::OnClose); 80 hanging_get_->SignalCloseEvent.connect(this, 81 &PeerConnectionClient::OnClose); 82 control_socket_->SignalConnectEvent.connect(this, 83 &PeerConnectionClient::OnConnect); 84 hanging_get_->SignalConnectEvent.connect(this, 85 &PeerConnectionClient::OnHangingGetConnect); 86 control_socket_->SignalReadEvent.connect(this, 87 &PeerConnectionClient::OnRead); 88 hanging_get_->SignalReadEvent.connect(this, 89 &PeerConnectionClient::OnHangingGetRead); 90 } 91 92 int PeerConnectionClient::id() const { 93 return my_id_; 94 } 95 96 bool PeerConnectionClient::is_connected() const { 97 return my_id_ != -1; 98 } 99 100 const Peers& PeerConnectionClient::peers() const { 101 return peers_; 102 } 103 104 void PeerConnectionClient::RegisterObserver( 105 PeerConnectionClientObserver* callback) { 106 ASSERT(!callback_); 107 callback_ = callback; 108 } 109 110 void PeerConnectionClient::Connect(const std::string& server, int port, 111 const std::string& client_name) { 112 ASSERT(!server.empty()); 113 ASSERT(!client_name.empty()); 114 115 if (state_ != NOT_CONNECTED) { 116 LOG(WARNING) 117 << "The client must not be connected before you can call Connect()"; 118 callback_->OnServerConnectionFailure(); 119 return; 120 } 121 122 if (server.empty() || client_name.empty()) { 123 callback_->OnServerConnectionFailure(); 124 return; 125 } 126 127 if (port <= 0) 128 port = kDefaultServerPort; 129 130 server_address_.SetIP(server); 131 server_address_.SetPort(port); 132 client_name_ = client_name; 133 134 if (server_address_.IsUnresolved()) { 135 state_ = RESOLVING; 136 resolver_ = new talk_base::AsyncResolver(); 137 resolver_->SignalWorkDone.connect(this, 138 &PeerConnectionClient::OnResolveResult); 139 resolver_->set_address(server_address_); 140 resolver_->Start(); 141 } else { 142 DoConnect(); 143 } 144 } 145 146 void PeerConnectionClient::OnResolveResult(talk_base::SignalThread *t) { 147 if (resolver_->error() != 0) { 148 callback_->OnServerConnectionFailure(); 149 resolver_->Destroy(false); 150 resolver_ = NULL; 151 state_ = NOT_CONNECTED; 152 } else { 153 server_address_ = resolver_->address(); 154 DoConnect(); 155 } 156 } 157 158 void PeerConnectionClient::DoConnect() { 159 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family())); 160 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family())); 161 InitSocketSignals(); 162 char buffer[1024]; 163 sprintfn(buffer, sizeof(buffer), 164 "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str()); 165 onconnect_data_ = buffer; 166 167 bool ret = ConnectControlSocket(); 168 if (ret) 169 state_ = SIGNING_IN; 170 if (!ret) { 171 callback_->OnServerConnectionFailure(); 172 } 173 } 174 175 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) { 176 if (state_ != CONNECTED) 177 return false; 178 179 ASSERT(is_connected()); 180 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED); 181 if (!is_connected() || peer_id == -1) 182 return false; 183 184 char headers[1024]; 185 sprintfn(headers, sizeof(headers), 186 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n" 187 "Content-Length: %i\r\n" 188 "Content-Type: text/plain\r\n" 189 "\r\n", 190 my_id_, peer_id, message.length()); 191 onconnect_data_ = headers; 192 onconnect_data_ += message; 193 return ConnectControlSocket(); 194 } 195 196 bool PeerConnectionClient::SendHangUp(int peer_id) { 197 return SendToPeer(peer_id, kByeMessage); 198 } 199 200 bool PeerConnectionClient::IsSendingMessage() { 201 return state_ == CONNECTED && 202 control_socket_->GetState() != talk_base::Socket::CS_CLOSED; 203 } 204 205 bool PeerConnectionClient::SignOut() { 206 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT) 207 return true; 208 209 if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED) 210 hanging_get_->Close(); 211 212 if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) { 213 state_ = SIGNING_OUT; 214 215 if (my_id_ != -1) { 216 char buffer[1024]; 217 sprintfn(buffer, sizeof(buffer), 218 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); 219 onconnect_data_ = buffer; 220 return ConnectControlSocket(); 221 } else { 222 // Can occur if the app is closed before we finish connecting. 223 return true; 224 } 225 } else { 226 state_ = SIGNING_OUT_WAITING; 227 } 228 229 return true; 230 } 231 232 void PeerConnectionClient::Close() { 233 control_socket_->Close(); 234 hanging_get_->Close(); 235 onconnect_data_.clear(); 236 peers_.clear(); 237 if (resolver_ != NULL) { 238 resolver_->Destroy(false); 239 resolver_ = NULL; 240 } 241 my_id_ = -1; 242 state_ = NOT_CONNECTED; 243 } 244 245 bool PeerConnectionClient::ConnectControlSocket() { 246 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED); 247 int err = control_socket_->Connect(server_address_); 248 if (err == SOCKET_ERROR) { 249 Close(); 250 return false; 251 } 252 return true; 253 } 254 255 void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) { 256 ASSERT(!onconnect_data_.empty()); 257 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length()); 258 ASSERT(sent == onconnect_data_.length()); 259 UNUSED(sent); 260 onconnect_data_.clear(); 261 } 262 263 void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) { 264 char buffer[1024]; 265 sprintfn(buffer, sizeof(buffer), 266 "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); 267 int len = static_cast<int>(strlen(buffer)); 268 int sent = socket->Send(buffer, len); 269 ASSERT(sent == len); 270 UNUSED2(sent, len); 271 } 272 273 void PeerConnectionClient::OnMessageFromPeer(int peer_id, 274 const std::string& message) { 275 if (message.length() == (sizeof(kByeMessage) - 1) && 276 message.compare(kByeMessage) == 0) { 277 callback_->OnPeerDisconnected(peer_id); 278 } else { 279 callback_->OnMessageFromPeer(peer_id, message); 280 } 281 } 282 283 bool PeerConnectionClient::GetHeaderValue(const std::string& data, 284 size_t eoh, 285 const char* header_pattern, 286 size_t* value) { 287 ASSERT(value != NULL); 288 size_t found = data.find(header_pattern); 289 if (found != std::string::npos && found < eoh) { 290 *value = atoi(&data[found + strlen(header_pattern)]); 291 return true; 292 } 293 return false; 294 } 295 296 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh, 297 const char* header_pattern, 298 std::string* value) { 299 ASSERT(value != NULL); 300 size_t found = data.find(header_pattern); 301 if (found != std::string::npos && found < eoh) { 302 size_t begin = found + strlen(header_pattern); 303 size_t end = data.find("\r\n", begin); 304 if (end == std::string::npos) 305 end = eoh; 306 value->assign(data.substr(begin, end - begin)); 307 return true; 308 } 309 return false; 310 } 311 312 bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket, 313 std::string* data, 314 size_t* content_length) { 315 char buffer[0xffff]; 316 do { 317 int bytes = socket->Recv(buffer, sizeof(buffer)); 318 if (bytes <= 0) 319 break; 320 data->append(buffer, bytes); 321 } while (true); 322 323 bool ret = false; 324 size_t i = data->find("\r\n\r\n"); 325 if (i != std::string::npos) { 326 LOG(INFO) << "Headers received"; 327 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) { 328 size_t total_response_size = (i + 4) + *content_length; 329 if (data->length() >= total_response_size) { 330 ret = true; 331 std::string should_close; 332 const char kConnection[] = "\r\nConnection: "; 333 if (GetHeaderValue(*data, i, kConnection, &should_close) && 334 should_close.compare("close") == 0) { 335 socket->Close(); 336 // Since we closed the socket, there was no notification delivered 337 // to us. Compensate by letting ourselves know. 338 OnClose(socket, 0); 339 } 340 } else { 341 // We haven't received everything. Just continue to accept data. 342 } 343 } else { 344 LOG(LS_ERROR) << "No content length field specified by the server."; 345 } 346 } 347 return ret; 348 } 349 350 void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) { 351 size_t content_length = 0; 352 if (ReadIntoBuffer(socket, &control_data_, &content_length)) { 353 size_t peer_id = 0, eoh = 0; 354 bool ok = ParseServerResponse(control_data_, content_length, &peer_id, 355 &eoh); 356 if (ok) { 357 if (my_id_ == -1) { 358 // First response. Let's store our server assigned ID. 359 ASSERT(state_ == SIGNING_IN); 360 my_id_ = static_cast<int>(peer_id); 361 ASSERT(my_id_ != -1); 362 363 // The body of the response will be a list of already connected peers. 364 if (content_length) { 365 size_t pos = eoh + 4; 366 while (pos < control_data_.size()) { 367 size_t eol = control_data_.find('\n', pos); 368 if (eol == std::string::npos) 369 break; 370 int id = 0; 371 std::string name; 372 bool connected; 373 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id, 374 &connected) && id != my_id_) { 375 peers_[id] = name; 376 callback_->OnPeerConnected(id, name); 377 } 378 pos = eol + 1; 379 } 380 } 381 ASSERT(is_connected()); 382 callback_->OnSignedIn(); 383 } else if (state_ == SIGNING_OUT) { 384 Close(); 385 callback_->OnDisconnected(); 386 } else if (state_ == SIGNING_OUT_WAITING) { 387 SignOut(); 388 } 389 } 390 391 control_data_.clear(); 392 393 if (state_ == SIGNING_IN) { 394 ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED); 395 state_ = CONNECTED; 396 hanging_get_->Connect(server_address_); 397 } 398 } 399 } 400 401 void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) { 402 LOG(INFO) << __FUNCTION__; 403 size_t content_length = 0; 404 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) { 405 size_t peer_id = 0, eoh = 0; 406 bool ok = ParseServerResponse(notification_data_, content_length, 407 &peer_id, &eoh); 408 409 if (ok) { 410 // Store the position where the body begins. 411 size_t pos = eoh + 4; 412 413 if (my_id_ == static_cast<int>(peer_id)) { 414 // A notification about a new member or a member that just 415 // disconnected. 416 int id = 0; 417 std::string name; 418 bool connected = false; 419 if (ParseEntry(notification_data_.substr(pos), &name, &id, 420 &connected)) { 421 if (connected) { 422 peers_[id] = name; 423 callback_->OnPeerConnected(id, name); 424 } else { 425 peers_.erase(id); 426 callback_->OnPeerDisconnected(id); 427 } 428 } 429 } else { 430 OnMessageFromPeer(static_cast<int>(peer_id), 431 notification_data_.substr(pos)); 432 } 433 } 434 435 notification_data_.clear(); 436 } 437 438 if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED && 439 state_ == CONNECTED) { 440 hanging_get_->Connect(server_address_); 441 } 442 } 443 444 bool PeerConnectionClient::ParseEntry(const std::string& entry, 445 std::string* name, 446 int* id, 447 bool* connected) { 448 ASSERT(name != NULL); 449 ASSERT(id != NULL); 450 ASSERT(connected != NULL); 451 ASSERT(!entry.empty()); 452 453 *connected = false; 454 size_t separator = entry.find(','); 455 if (separator != std::string::npos) { 456 *id = atoi(&entry[separator + 1]); 457 name->assign(entry.substr(0, separator)); 458 separator = entry.find(',', separator + 1); 459 if (separator != std::string::npos) { 460 *connected = atoi(&entry[separator + 1]) ? true : false; 461 } 462 } 463 return !name->empty(); 464 } 465 466 int PeerConnectionClient::GetResponseStatus(const std::string& response) { 467 int status = -1; 468 size_t pos = response.find(' '); 469 if (pos != std::string::npos) 470 status = atoi(&response[pos + 1]); 471 return status; 472 } 473 474 bool PeerConnectionClient::ParseServerResponse(const std::string& response, 475 size_t content_length, 476 size_t* peer_id, 477 size_t* eoh) { 478 int status = GetResponseStatus(response.c_str()); 479 if (status != 200) { 480 LOG(LS_ERROR) << "Received error from server"; 481 Close(); 482 callback_->OnDisconnected(); 483 return false; 484 } 485 486 *eoh = response.find("\r\n\r\n"); 487 ASSERT(*eoh != std::string::npos); 488 if (*eoh == std::string::npos) 489 return false; 490 491 *peer_id = -1; 492 493 // See comment in peer_channel.cc for why we use the Pragma header and 494 // not e.g. "X-Peer-Id". 495 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id); 496 497 return true; 498 } 499 500 void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) { 501 LOG(INFO) << __FUNCTION__; 502 503 socket->Close(); 504 505 #ifdef WIN32 506 if (err != WSAECONNREFUSED) { 507 #else 508 if (err != ECONNREFUSED) { 509 #endif 510 if (socket == hanging_get_.get()) { 511 if (state_ == CONNECTED) { 512 hanging_get_->Close(); 513 hanging_get_->Connect(server_address_); 514 } 515 } else { 516 callback_->OnMessageSent(err); 517 } 518 } else { 519 if (socket == control_socket_.get()) { 520 LOG(WARNING) << "Connection refused; retrying in 2 seconds"; 521 talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0); 522 } else { 523 Close(); 524 callback_->OnDisconnected(); 525 } 526 } 527 } 528 529 void PeerConnectionClient::OnMessage(talk_base::Message* msg) { 530 // ignore msg; there is currently only one supported message ("retry") 531 DoConnect(); 532 } 533