1 /* 2 * Copyright 2012 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/examples/peerconnection/client/peer_connection_client.h" 12 13 #include "webrtc/examples/peerconnection/client/defaults.h" 14 #include "webrtc/base/common.h" 15 #include "webrtc/base/logging.h" 16 #include "webrtc/base/nethelpers.h" 17 #include "webrtc/base/stringutils.h" 18 19 #ifdef WIN32 20 #include "webrtc/base/win32socketserver.h" 21 #endif 22 23 using rtc::sprintfn; 24 25 namespace { 26 27 // This is our magical hangup signal. 28 const char kByeMessage[] = "BYE"; 29 // Delay between server connection retries, in milliseconds 30 const int kReconnectDelay = 2000; 31 32 rtc::AsyncSocket* CreateClientSocket(int family) { 33 #ifdef WIN32 34 rtc::Win32Socket* sock = new rtc::Win32Socket(); 35 sock->CreateT(family, SOCK_STREAM); 36 return sock; 37 #elif defined(WEBRTC_POSIX) 38 rtc::Thread* thread = rtc::Thread::Current(); 39 ASSERT(thread != NULL); 40 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM); 41 #else 42 #error Platform not supported. 43 #endif 44 } 45 46 } // namespace 47 48 PeerConnectionClient::PeerConnectionClient() 49 : callback_(NULL), 50 resolver_(NULL), 51 state_(NOT_CONNECTED), 52 my_id_(-1) { 53 } 54 55 PeerConnectionClient::~PeerConnectionClient() { 56 } 57 58 void PeerConnectionClient::InitSocketSignals() { 59 ASSERT(control_socket_.get() != NULL); 60 ASSERT(hanging_get_.get() != NULL); 61 control_socket_->SignalCloseEvent.connect(this, 62 &PeerConnectionClient::OnClose); 63 hanging_get_->SignalCloseEvent.connect(this, 64 &PeerConnectionClient::OnClose); 65 control_socket_->SignalConnectEvent.connect(this, 66 &PeerConnectionClient::OnConnect); 67 hanging_get_->SignalConnectEvent.connect(this, 68 &PeerConnectionClient::OnHangingGetConnect); 69 control_socket_->SignalReadEvent.connect(this, 70 &PeerConnectionClient::OnRead); 71 hanging_get_->SignalReadEvent.connect(this, 72 &PeerConnectionClient::OnHangingGetRead); 73 } 74 75 int PeerConnectionClient::id() const { 76 return my_id_; 77 } 78 79 bool PeerConnectionClient::is_connected() const { 80 return my_id_ != -1; 81 } 82 83 const Peers& PeerConnectionClient::peers() const { 84 return peers_; 85 } 86 87 void PeerConnectionClient::RegisterObserver( 88 PeerConnectionClientObserver* callback) { 89 ASSERT(!callback_); 90 callback_ = callback; 91 } 92 93 void PeerConnectionClient::Connect(const std::string& server, int port, 94 const std::string& client_name) { 95 ASSERT(!server.empty()); 96 ASSERT(!client_name.empty()); 97 98 if (state_ != NOT_CONNECTED) { 99 LOG(WARNING) 100 << "The client must not be connected before you can call Connect()"; 101 callback_->OnServerConnectionFailure(); 102 return; 103 } 104 105 if (server.empty() || client_name.empty()) { 106 callback_->OnServerConnectionFailure(); 107 return; 108 } 109 110 if (port <= 0) 111 port = kDefaultServerPort; 112 113 server_address_.SetIP(server); 114 server_address_.SetPort(port); 115 client_name_ = client_name; 116 117 if (server_address_.IsUnresolvedIP()) { 118 state_ = RESOLVING; 119 resolver_ = new rtc::AsyncResolver(); 120 resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult); 121 resolver_->Start(server_address_); 122 } else { 123 DoConnect(); 124 } 125 } 126 127 void PeerConnectionClient::OnResolveResult( 128 rtc::AsyncResolverInterface* resolver) { 129 if (resolver_->GetError() != 0) { 130 callback_->OnServerConnectionFailure(); 131 resolver_->Destroy(false); 132 resolver_ = NULL; 133 state_ = NOT_CONNECTED; 134 } else { 135 server_address_ = resolver_->address(); 136 DoConnect(); 137 } 138 } 139 140 void PeerConnectionClient::DoConnect() { 141 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family())); 142 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family())); 143 InitSocketSignals(); 144 char buffer[1024]; 145 sprintfn(buffer, sizeof(buffer), 146 "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str()); 147 onconnect_data_ = buffer; 148 149 bool ret = ConnectControlSocket(); 150 if (ret) 151 state_ = SIGNING_IN; 152 if (!ret) { 153 callback_->OnServerConnectionFailure(); 154 } 155 } 156 157 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) { 158 if (state_ != CONNECTED) 159 return false; 160 161 ASSERT(is_connected()); 162 ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED); 163 if (!is_connected() || peer_id == -1) 164 return false; 165 166 char headers[1024]; 167 sprintfn(headers, sizeof(headers), 168 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n" 169 "Content-Length: %i\r\n" 170 "Content-Type: text/plain\r\n" 171 "\r\n", 172 my_id_, peer_id, message.length()); 173 onconnect_data_ = headers; 174 onconnect_data_ += message; 175 return ConnectControlSocket(); 176 } 177 178 bool PeerConnectionClient::SendHangUp(int peer_id) { 179 return SendToPeer(peer_id, kByeMessage); 180 } 181 182 bool PeerConnectionClient::IsSendingMessage() { 183 return state_ == CONNECTED && 184 control_socket_->GetState() != rtc::Socket::CS_CLOSED; 185 } 186 187 bool PeerConnectionClient::SignOut() { 188 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT) 189 return true; 190 191 if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED) 192 hanging_get_->Close(); 193 194 if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) { 195 state_ = SIGNING_OUT; 196 197 if (my_id_ != -1) { 198 char buffer[1024]; 199 sprintfn(buffer, sizeof(buffer), 200 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); 201 onconnect_data_ = buffer; 202 return ConnectControlSocket(); 203 } else { 204 // Can occur if the app is closed before we finish connecting. 205 return true; 206 } 207 } else { 208 state_ = SIGNING_OUT_WAITING; 209 } 210 211 return true; 212 } 213 214 void PeerConnectionClient::Close() { 215 control_socket_->Close(); 216 hanging_get_->Close(); 217 onconnect_data_.clear(); 218 peers_.clear(); 219 if (resolver_ != NULL) { 220 resolver_->Destroy(false); 221 resolver_ = NULL; 222 } 223 my_id_ = -1; 224 state_ = NOT_CONNECTED; 225 } 226 227 bool PeerConnectionClient::ConnectControlSocket() { 228 ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED); 229 int err = control_socket_->Connect(server_address_); 230 if (err == SOCKET_ERROR) { 231 Close(); 232 return false; 233 } 234 return true; 235 } 236 237 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) { 238 ASSERT(!onconnect_data_.empty()); 239 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length()); 240 ASSERT(sent == onconnect_data_.length()); 241 RTC_UNUSED(sent); 242 onconnect_data_.clear(); 243 } 244 245 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) { 246 char buffer[1024]; 247 sprintfn(buffer, sizeof(buffer), 248 "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); 249 int len = static_cast<int>(strlen(buffer)); 250 int sent = socket->Send(buffer, len); 251 ASSERT(sent == len); 252 RTC_UNUSED2(sent, len); 253 } 254 255 void PeerConnectionClient::OnMessageFromPeer(int peer_id, 256 const std::string& message) { 257 if (message.length() == (sizeof(kByeMessage) - 1) && 258 message.compare(kByeMessage) == 0) { 259 callback_->OnPeerDisconnected(peer_id); 260 } else { 261 callback_->OnMessageFromPeer(peer_id, message); 262 } 263 } 264 265 bool PeerConnectionClient::GetHeaderValue(const std::string& data, 266 size_t eoh, 267 const char* header_pattern, 268 size_t* value) { 269 ASSERT(value != NULL); 270 size_t found = data.find(header_pattern); 271 if (found != std::string::npos && found < eoh) { 272 *value = atoi(&data[found + strlen(header_pattern)]); 273 return true; 274 } 275 return false; 276 } 277 278 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh, 279 const char* header_pattern, 280 std::string* value) { 281 ASSERT(value != NULL); 282 size_t found = data.find(header_pattern); 283 if (found != std::string::npos && found < eoh) { 284 size_t begin = found + strlen(header_pattern); 285 size_t end = data.find("\r\n", begin); 286 if (end == std::string::npos) 287 end = eoh; 288 value->assign(data.substr(begin, end - begin)); 289 return true; 290 } 291 return false; 292 } 293 294 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket, 295 std::string* data, 296 size_t* content_length) { 297 char buffer[0xffff]; 298 do { 299 int bytes = socket->Recv(buffer, sizeof(buffer)); 300 if (bytes <= 0) 301 break; 302 data->append(buffer, bytes); 303 } while (true); 304 305 bool ret = false; 306 size_t i = data->find("\r\n\r\n"); 307 if (i != std::string::npos) { 308 LOG(INFO) << "Headers received"; 309 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) { 310 size_t total_response_size = (i + 4) + *content_length; 311 if (data->length() >= total_response_size) { 312 ret = true; 313 std::string should_close; 314 const char kConnection[] = "\r\nConnection: "; 315 if (GetHeaderValue(*data, i, kConnection, &should_close) && 316 should_close.compare("close") == 0) { 317 socket->Close(); 318 // Since we closed the socket, there was no notification delivered 319 // to us. Compensate by letting ourselves know. 320 OnClose(socket, 0); 321 } 322 } else { 323 // We haven't received everything. Just continue to accept data. 324 } 325 } else { 326 LOG(LS_ERROR) << "No content length field specified by the server."; 327 } 328 } 329 return ret; 330 } 331 332 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) { 333 size_t content_length = 0; 334 if (ReadIntoBuffer(socket, &control_data_, &content_length)) { 335 size_t peer_id = 0, eoh = 0; 336 bool ok = ParseServerResponse(control_data_, content_length, &peer_id, 337 &eoh); 338 if (ok) { 339 if (my_id_ == -1) { 340 // First response. Let's store our server assigned ID. 341 ASSERT(state_ == SIGNING_IN); 342 my_id_ = static_cast<int>(peer_id); 343 ASSERT(my_id_ != -1); 344 345 // The body of the response will be a list of already connected peers. 346 if (content_length) { 347 size_t pos = eoh + 4; 348 while (pos < control_data_.size()) { 349 size_t eol = control_data_.find('\n', pos); 350 if (eol == std::string::npos) 351 break; 352 int id = 0; 353 std::string name; 354 bool connected; 355 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id, 356 &connected) && id != my_id_) { 357 peers_[id] = name; 358 callback_->OnPeerConnected(id, name); 359 } 360 pos = eol + 1; 361 } 362 } 363 ASSERT(is_connected()); 364 callback_->OnSignedIn(); 365 } else if (state_ == SIGNING_OUT) { 366 Close(); 367 callback_->OnDisconnected(); 368 } else if (state_ == SIGNING_OUT_WAITING) { 369 SignOut(); 370 } 371 } 372 373 control_data_.clear(); 374 375 if (state_ == SIGNING_IN) { 376 ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED); 377 state_ = CONNECTED; 378 hanging_get_->Connect(server_address_); 379 } 380 } 381 } 382 383 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) { 384 LOG(INFO) << __FUNCTION__; 385 size_t content_length = 0; 386 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) { 387 size_t peer_id = 0, eoh = 0; 388 bool ok = ParseServerResponse(notification_data_, content_length, 389 &peer_id, &eoh); 390 391 if (ok) { 392 // Store the position where the body begins. 393 size_t pos = eoh + 4; 394 395 if (my_id_ == static_cast<int>(peer_id)) { 396 // A notification about a new member or a member that just 397 // disconnected. 398 int id = 0; 399 std::string name; 400 bool connected = false; 401 if (ParseEntry(notification_data_.substr(pos), &name, &id, 402 &connected)) { 403 if (connected) { 404 peers_[id] = name; 405 callback_->OnPeerConnected(id, name); 406 } else { 407 peers_.erase(id); 408 callback_->OnPeerDisconnected(id); 409 } 410 } 411 } else { 412 OnMessageFromPeer(static_cast<int>(peer_id), 413 notification_data_.substr(pos)); 414 } 415 } 416 417 notification_data_.clear(); 418 } 419 420 if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED && 421 state_ == CONNECTED) { 422 hanging_get_->Connect(server_address_); 423 } 424 } 425 426 bool PeerConnectionClient::ParseEntry(const std::string& entry, 427 std::string* name, 428 int* id, 429 bool* connected) { 430 ASSERT(name != NULL); 431 ASSERT(id != NULL); 432 ASSERT(connected != NULL); 433 ASSERT(!entry.empty()); 434 435 *connected = false; 436 size_t separator = entry.find(','); 437 if (separator != std::string::npos) { 438 *id = atoi(&entry[separator + 1]); 439 name->assign(entry.substr(0, separator)); 440 separator = entry.find(',', separator + 1); 441 if (separator != std::string::npos) { 442 *connected = atoi(&entry[separator + 1]) ? true : false; 443 } 444 } 445 return !name->empty(); 446 } 447 448 int PeerConnectionClient::GetResponseStatus(const std::string& response) { 449 int status = -1; 450 size_t pos = response.find(' '); 451 if (pos != std::string::npos) 452 status = atoi(&response[pos + 1]); 453 return status; 454 } 455 456 bool PeerConnectionClient::ParseServerResponse(const std::string& response, 457 size_t content_length, 458 size_t* peer_id, 459 size_t* eoh) { 460 int status = GetResponseStatus(response.c_str()); 461 if (status != 200) { 462 LOG(LS_ERROR) << "Received error from server"; 463 Close(); 464 callback_->OnDisconnected(); 465 return false; 466 } 467 468 *eoh = response.find("\r\n\r\n"); 469 ASSERT(*eoh != std::string::npos); 470 if (*eoh == std::string::npos) 471 return false; 472 473 *peer_id = -1; 474 475 // See comment in peer_channel.cc for why we use the Pragma header and 476 // not e.g. "X-Peer-Id". 477 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id); 478 479 return true; 480 } 481 482 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) { 483 LOG(INFO) << __FUNCTION__; 484 485 socket->Close(); 486 487 #ifdef WIN32 488 if (err != WSAECONNREFUSED) { 489 #else 490 if (err != ECONNREFUSED) { 491 #endif 492 if (socket == hanging_get_.get()) { 493 if (state_ == CONNECTED) { 494 hanging_get_->Close(); 495 hanging_get_->Connect(server_address_); 496 } 497 } else { 498 callback_->OnMessageSent(err); 499 } 500 } else { 501 if (socket == control_socket_.get()) { 502 LOG(WARNING) << "Connection refused; retrying in 2 seconds"; 503 rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0); 504 } else { 505 Close(); 506 callback_->OnDisconnected(); 507 } 508 } 509 } 510 511 void PeerConnectionClient::OnMessage(rtc::Message* msg) { 512 // ignore msg; there is currently only one supported message ("retry") 513 DoConnect(); 514 } 515