1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/base/virtualsocketserver.h" 12 13 #include <errno.h> 14 #include <math.h> 15 16 #include <algorithm> 17 #include <map> 18 #include <vector> 19 20 #include "webrtc/base/checks.h" 21 #include "webrtc/base/common.h" 22 #include "webrtc/base/logging.h" 23 #include "webrtc/base/physicalsocketserver.h" 24 #include "webrtc/base/socketaddresspair.h" 25 #include "webrtc/base/thread.h" 26 #include "webrtc/base/timeutils.h" 27 28 namespace rtc { 29 #if defined(WEBRTC_WIN) 30 const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} }; 31 #else 32 // This value is entirely arbitrary, hence the lack of concern about endianness. 33 const in_addr kInitialNextIPv4 = { 0x01000000 }; 34 #endif 35 // Starts at ::2 so as to not cause confusion with ::1. 36 const in6_addr kInitialNextIPv6 = { { { 37 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2 38 } } }; 39 40 const uint16_t kFirstEphemeralPort = 49152; 41 const uint16_t kLastEphemeralPort = 65535; 42 const uint16_t kEphemeralPortCount = 43 kLastEphemeralPort - kFirstEphemeralPort + 1; 44 const uint32_t kDefaultNetworkCapacity = 64 * 1024; 45 const uint32_t kDefaultTcpBufferSize = 32 * 1024; 46 47 const uint32_t UDP_HEADER_SIZE = 28; // IP + UDP headers 48 const uint32_t TCP_HEADER_SIZE = 40; // IP + TCP headers 49 const uint32_t TCP_MSS = 1400; // Maximum segment size 50 51 // Note: The current algorithm doesn't work for sample sizes smaller than this. 52 const int NUM_SAMPLES = 1000; 53 54 enum { 55 MSG_ID_PACKET, 56 MSG_ID_ADDRESS_BOUND, 57 MSG_ID_CONNECT, 58 MSG_ID_DISCONNECT, 59 }; 60 61 // Packets are passed between sockets as messages. We copy the data just like 62 // the kernel does. 63 class Packet : public MessageData { 64 public: 65 Packet(const char* data, size_t size, const SocketAddress& from) 66 : size_(size), consumed_(0), from_(from) { 67 ASSERT(NULL != data); 68 data_ = new char[size_]; 69 memcpy(data_, data, size_); 70 } 71 72 ~Packet() override { 73 delete[] data_; 74 } 75 76 const char* data() const { return data_ + consumed_; } 77 size_t size() const { return size_ - consumed_; } 78 const SocketAddress& from() const { return from_; } 79 80 // Remove the first size bytes from the data. 81 void Consume(size_t size) { 82 ASSERT(size + consumed_ < size_); 83 consumed_ += size; 84 } 85 86 private: 87 char* data_; 88 size_t size_, consumed_; 89 SocketAddress from_; 90 }; 91 92 struct MessageAddress : public MessageData { 93 explicit MessageAddress(const SocketAddress& a) : addr(a) { } 94 SocketAddress addr; 95 }; 96 97 VirtualSocket::VirtualSocket(VirtualSocketServer* server, 98 int family, 99 int type, 100 bool async) 101 : server_(server), 102 type_(type), 103 async_(async), 104 state_(CS_CLOSED), 105 error_(0), 106 listen_queue_(NULL), 107 write_enabled_(false), 108 network_size_(0), 109 recv_buffer_size_(0), 110 bound_(false), 111 was_any_(false) { 112 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); 113 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams 114 } 115 116 VirtualSocket::~VirtualSocket() { 117 Close(); 118 119 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); 120 ++it) { 121 delete *it; 122 } 123 } 124 125 SocketAddress VirtualSocket::GetLocalAddress() const { 126 if (!alternative_local_addr_.IsNil()) 127 return alternative_local_addr_; 128 return local_addr_; 129 } 130 131 SocketAddress VirtualSocket::GetRemoteAddress() const { 132 return remote_addr_; 133 } 134 135 void VirtualSocket::SetLocalAddress(const SocketAddress& addr) { 136 local_addr_ = addr; 137 } 138 139 void VirtualSocket::SetAlternativeLocalAddress(const SocketAddress& addr) { 140 alternative_local_addr_ = addr; 141 } 142 143 int VirtualSocket::Bind(const SocketAddress& addr) { 144 if (!local_addr_.IsNil()) { 145 error_ = EINVAL; 146 return -1; 147 } 148 local_addr_ = addr; 149 int result = server_->Bind(this, &local_addr_); 150 if (result != 0) { 151 local_addr_.Clear(); 152 error_ = EADDRINUSE; 153 } else { 154 bound_ = true; 155 was_any_ = addr.IsAnyIP(); 156 // Post a message here such that test case could have chance to 157 // process the local address. (i.e. SetAlternativeLocalAddress). 158 server_->msg_queue_->Post(this, MSG_ID_ADDRESS_BOUND); 159 } 160 return result; 161 } 162 163 int VirtualSocket::Connect(const SocketAddress& addr) { 164 return InitiateConnect(addr, true); 165 } 166 167 int VirtualSocket::Close() { 168 if (!local_addr_.IsNil() && bound_) { 169 // Remove from the binding table. 170 server_->Unbind(local_addr_, this); 171 bound_ = false; 172 } 173 174 if (SOCK_STREAM == type_) { 175 // Cancel pending sockets 176 if (listen_queue_) { 177 while (!listen_queue_->empty()) { 178 SocketAddress addr = listen_queue_->front(); 179 180 // Disconnect listening socket. 181 server_->Disconnect(server_->LookupBinding(addr)); 182 listen_queue_->pop_front(); 183 } 184 delete listen_queue_; 185 listen_queue_ = NULL; 186 } 187 // Disconnect stream sockets 188 if (CS_CONNECTED == state_) { 189 // Disconnect remote socket, check if it is a child of a server socket. 190 VirtualSocket* socket = 191 server_->LookupConnection(local_addr_, remote_addr_); 192 if (!socket) { 193 // Not a server socket child, then see if it is bound. 194 // TODO(tbd): If this is indeed a server socket that has no 195 // children this will cause the server socket to be 196 // closed. This might lead to unexpected results, how to fix this? 197 socket = server_->LookupBinding(remote_addr_); 198 } 199 server_->Disconnect(socket); 200 201 // Remove mapping for both directions. 202 server_->RemoveConnection(remote_addr_, local_addr_); 203 server_->RemoveConnection(local_addr_, remote_addr_); 204 } 205 // Cancel potential connects 206 MessageList msgs; 207 if (server_->msg_queue_) { 208 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); 209 } 210 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { 211 ASSERT(NULL != it->pdata); 212 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); 213 214 // Lookup remote side. 215 VirtualSocket* socket = 216 server_->LookupConnection(local_addr_, data->addr); 217 if (socket) { 218 // Server socket, remote side is a socket retreived by 219 // accept. Accepted sockets are not bound so we will not 220 // find it by looking in the bindings table. 221 server_->Disconnect(socket); 222 server_->RemoveConnection(local_addr_, data->addr); 223 } else { 224 server_->Disconnect(server_->LookupBinding(data->addr)); 225 } 226 delete data; 227 } 228 // Clear incoming packets and disconnect messages 229 if (server_->msg_queue_) { 230 server_->msg_queue_->Clear(this); 231 } 232 } 233 234 state_ = CS_CLOSED; 235 local_addr_.Clear(); 236 remote_addr_.Clear(); 237 return 0; 238 } 239 240 int VirtualSocket::Send(const void* pv, size_t cb) { 241 if (CS_CONNECTED != state_) { 242 error_ = ENOTCONN; 243 return -1; 244 } 245 if (SOCK_DGRAM == type_) { 246 return SendUdp(pv, cb, remote_addr_); 247 } else { 248 return SendTcp(pv, cb); 249 } 250 } 251 252 int VirtualSocket::SendTo(const void* pv, 253 size_t cb, 254 const SocketAddress& addr) { 255 if (SOCK_DGRAM == type_) { 256 return SendUdp(pv, cb, addr); 257 } else { 258 if (CS_CONNECTED != state_) { 259 error_ = ENOTCONN; 260 return -1; 261 } 262 return SendTcp(pv, cb); 263 } 264 } 265 266 int VirtualSocket::Recv(void* pv, size_t cb) { 267 SocketAddress addr; 268 return RecvFrom(pv, cb, &addr); 269 } 270 271 int VirtualSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) { 272 // If we don't have a packet, then either error or wait for one to arrive. 273 if (recv_buffer_.empty()) { 274 if (async_) { 275 error_ = EAGAIN; 276 return -1; 277 } 278 while (recv_buffer_.empty()) { 279 Message msg; 280 server_->msg_queue_->Get(&msg); 281 server_->msg_queue_->Dispatch(&msg); 282 } 283 } 284 285 // Return the packet at the front of the queue. 286 Packet* packet = recv_buffer_.front(); 287 size_t data_read = std::min(cb, packet->size()); 288 memcpy(pv, packet->data(), data_read); 289 *paddr = packet->from(); 290 291 if (data_read < packet->size()) { 292 packet->Consume(data_read); 293 } else { 294 recv_buffer_.pop_front(); 295 delete packet; 296 } 297 298 if (SOCK_STREAM == type_) { 299 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); 300 recv_buffer_size_ -= data_read; 301 if (was_full) { 302 VirtualSocket* sender = server_->LookupBinding(remote_addr_); 303 ASSERT(NULL != sender); 304 server_->SendTcp(sender); 305 } 306 } 307 308 return static_cast<int>(data_read); 309 } 310 311 int VirtualSocket::Listen(int backlog) { 312 ASSERT(SOCK_STREAM == type_); 313 ASSERT(CS_CLOSED == state_); 314 if (local_addr_.IsNil()) { 315 error_ = EINVAL; 316 return -1; 317 } 318 ASSERT(NULL == listen_queue_); 319 listen_queue_ = new ListenQueue; 320 state_ = CS_CONNECTING; 321 return 0; 322 } 323 324 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { 325 if (NULL == listen_queue_) { 326 error_ = EINVAL; 327 return NULL; 328 } 329 while (!listen_queue_->empty()) { 330 VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, async_); 331 332 // Set the new local address to the same as this server socket. 333 socket->SetLocalAddress(local_addr_); 334 // Sockets made from a socket that 'was Any' need to inherit that. 335 socket->set_was_any(was_any_); 336 SocketAddress remote_addr(listen_queue_->front()); 337 int result = socket->InitiateConnect(remote_addr, false); 338 listen_queue_->pop_front(); 339 if (result != 0) { 340 delete socket; 341 continue; 342 } 343 socket->CompleteConnect(remote_addr, false); 344 if (paddr) { 345 *paddr = remote_addr; 346 } 347 return socket; 348 } 349 error_ = EWOULDBLOCK; 350 return NULL; 351 } 352 353 int VirtualSocket::GetError() const { 354 return error_; 355 } 356 357 void VirtualSocket::SetError(int error) { 358 error_ = error; 359 } 360 361 Socket::ConnState VirtualSocket::GetState() const { 362 return state_; 363 } 364 365 int VirtualSocket::GetOption(Option opt, int* value) { 366 OptionsMap::const_iterator it = options_map_.find(opt); 367 if (it == options_map_.end()) { 368 return -1; 369 } 370 *value = it->second; 371 return 0; // 0 is success to emulate getsockopt() 372 } 373 374 int VirtualSocket::SetOption(Option opt, int value) { 375 options_map_[opt] = value; 376 return 0; // 0 is success to emulate setsockopt() 377 } 378 379 int VirtualSocket::EstimateMTU(uint16_t* mtu) { 380 if (CS_CONNECTED != state_) 381 return ENOTCONN; 382 else 383 return 65536; 384 } 385 386 void VirtualSocket::OnMessage(Message* pmsg) { 387 if (pmsg->message_id == MSG_ID_PACKET) { 388 // ASSERT(!local_addr_.IsAnyIP()); 389 ASSERT(NULL != pmsg->pdata); 390 Packet* packet = static_cast<Packet*>(pmsg->pdata); 391 392 recv_buffer_.push_back(packet); 393 394 if (async_) { 395 SignalReadEvent(this); 396 } 397 } else if (pmsg->message_id == MSG_ID_CONNECT) { 398 ASSERT(NULL != pmsg->pdata); 399 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); 400 if (listen_queue_ != NULL) { 401 listen_queue_->push_back(data->addr); 402 if (async_) { 403 SignalReadEvent(this); 404 } 405 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { 406 CompleteConnect(data->addr, true); 407 } else { 408 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; 409 server_->Disconnect(server_->LookupBinding(data->addr)); 410 } 411 delete data; 412 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { 413 ASSERT(SOCK_STREAM == type_); 414 if (CS_CLOSED != state_) { 415 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; 416 state_ = CS_CLOSED; 417 remote_addr_.Clear(); 418 if (async_) { 419 SignalCloseEvent(this, error); 420 } 421 } 422 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { 423 SignalAddressReady(this, GetLocalAddress()); 424 } else { 425 ASSERT(false); 426 } 427 } 428 429 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { 430 if (!remote_addr_.IsNil()) { 431 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; 432 return -1; 433 } 434 if (local_addr_.IsNil()) { 435 // If there's no local address set, grab a random one in the correct AF. 436 int result = 0; 437 if (addr.ipaddr().family() == AF_INET) { 438 result = Bind(SocketAddress("0.0.0.0", 0)); 439 } else if (addr.ipaddr().family() == AF_INET6) { 440 result = Bind(SocketAddress("::", 0)); 441 } 442 if (result != 0) { 443 return result; 444 } 445 } 446 if (type_ == SOCK_DGRAM) { 447 remote_addr_ = addr; 448 state_ = CS_CONNECTED; 449 } else { 450 int result = server_->Connect(this, addr, use_delay); 451 if (result != 0) { 452 error_ = EHOSTUNREACH; 453 return -1; 454 } 455 state_ = CS_CONNECTING; 456 } 457 return 0; 458 } 459 460 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { 461 ASSERT(CS_CONNECTING == state_); 462 remote_addr_ = addr; 463 state_ = CS_CONNECTED; 464 server_->AddConnection(remote_addr_, local_addr_, this); 465 if (async_ && notify) { 466 SignalConnectEvent(this); 467 } 468 } 469 470 int VirtualSocket::SendUdp(const void* pv, 471 size_t cb, 472 const SocketAddress& addr) { 473 // If we have not been assigned a local port, then get one. 474 if (local_addr_.IsNil()) { 475 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); 476 int result = server_->Bind(this, &local_addr_); 477 if (result != 0) { 478 local_addr_.Clear(); 479 error_ = EADDRINUSE; 480 return result; 481 } 482 } 483 484 // Send the data in a message to the appropriate socket. 485 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); 486 } 487 488 int VirtualSocket::SendTcp(const void* pv, size_t cb) { 489 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); 490 if (0 == capacity) { 491 write_enabled_ = true; 492 error_ = EWOULDBLOCK; 493 return -1; 494 } 495 size_t consumed = std::min(cb, capacity); 496 const char* cpv = static_cast<const char*>(pv); 497 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); 498 server_->SendTcp(this); 499 return static_cast<int>(consumed); 500 } 501 502 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) 503 : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false), 504 network_delay_(Time()), next_ipv4_(kInitialNextIPv4), 505 next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort), 506 bindings_(new AddressMap()), connections_(new ConnectionMap()), 507 bandwidth_(0), network_capacity_(kDefaultNetworkCapacity), 508 send_buffer_capacity_(kDefaultTcpBufferSize), 509 recv_buffer_capacity_(kDefaultTcpBufferSize), 510 delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES), 511 delay_dist_(NULL), drop_prob_(0.0) { 512 if (!server_) { 513 server_ = new PhysicalSocketServer(); 514 server_owned_ = true; 515 } 516 UpdateDelayDistribution(); 517 } 518 519 VirtualSocketServer::~VirtualSocketServer() { 520 delete bindings_; 521 delete connections_; 522 delete delay_dist_; 523 if (server_owned_) { 524 delete server_; 525 } 526 } 527 528 IPAddress VirtualSocketServer::GetNextIP(int family) { 529 if (family == AF_INET) { 530 IPAddress next_ip(next_ipv4_); 531 next_ipv4_.s_addr = 532 HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1); 533 return next_ip; 534 } else if (family == AF_INET6) { 535 IPAddress next_ip(next_ipv6_); 536 uint32_t* as_ints = reinterpret_cast<uint32_t*>(&next_ipv6_.s6_addr); 537 as_ints[3] += 1; 538 return next_ip; 539 } 540 return IPAddress(); 541 } 542 543 uint16_t VirtualSocketServer::GetNextPort() { 544 uint16_t port = next_port_; 545 if (next_port_ < kLastEphemeralPort) { 546 ++next_port_; 547 } else { 548 next_port_ = kFirstEphemeralPort; 549 } 550 return port; 551 } 552 553 Socket* VirtualSocketServer::CreateSocket(int type) { 554 return CreateSocket(AF_INET, type); 555 } 556 557 Socket* VirtualSocketServer::CreateSocket(int family, int type) { 558 return CreateSocketInternal(family, type); 559 } 560 561 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { 562 return CreateAsyncSocket(AF_INET, type); 563 } 564 565 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) { 566 return CreateSocketInternal(family, type); 567 } 568 569 VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) { 570 return new VirtualSocket(this, family, type, true); 571 } 572 573 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { 574 msg_queue_ = msg_queue; 575 if (msg_queue_) { 576 msg_queue_->SignalQueueDestroyed.connect(this, 577 &VirtualSocketServer::OnMessageQueueDestroyed); 578 } 579 } 580 581 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { 582 ASSERT(msg_queue_ == Thread::Current()); 583 if (stop_on_idle_ && Thread::Current()->empty()) { 584 return false; 585 } 586 return socketserver()->Wait(cmsWait, process_io); 587 } 588 589 void VirtualSocketServer::WakeUp() { 590 socketserver()->WakeUp(); 591 } 592 593 bool VirtualSocketServer::ProcessMessagesUntilIdle() { 594 ASSERT(msg_queue_ == Thread::Current()); 595 stop_on_idle_ = true; 596 while (!msg_queue_->empty()) { 597 Message msg; 598 if (msg_queue_->Get(&msg, Thread::kForever)) { 599 msg_queue_->Dispatch(&msg); 600 } 601 } 602 stop_on_idle_ = false; 603 return !msg_queue_->IsQuitting(); 604 } 605 606 void VirtualSocketServer::SetNextPortForTesting(uint16_t port) { 607 next_port_ = port; 608 } 609 610 bool VirtualSocketServer::CloseTcpConnections( 611 const SocketAddress& addr_local, 612 const SocketAddress& addr_remote) { 613 VirtualSocket* socket = LookupConnection(addr_local, addr_remote); 614 if (!socket) { 615 return false; 616 } 617 // Signal the close event on the local connection first. 618 socket->SignalCloseEvent(socket, 0); 619 620 // Trigger the remote connection's close event. 621 socket->Close(); 622 623 return true; 624 } 625 626 int VirtualSocketServer::Bind(VirtualSocket* socket, 627 const SocketAddress& addr) { 628 ASSERT(NULL != socket); 629 // Address must be completely specified at this point 630 ASSERT(!IPIsUnspec(addr.ipaddr())); 631 ASSERT(addr.port() != 0); 632 633 // Normalize the address (turns v6-mapped addresses into v4-addresses). 634 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); 635 636 AddressMap::value_type entry(normalized, socket); 637 return bindings_->insert(entry).second ? 0 : -1; 638 } 639 640 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { 641 ASSERT(NULL != socket); 642 643 if (!IPIsUnspec(addr->ipaddr())) { 644 addr->SetIP(addr->ipaddr().Normalized()); 645 } else { 646 ASSERT(false); 647 } 648 649 if (addr->port() == 0) { 650 for (int i = 0; i < kEphemeralPortCount; ++i) { 651 addr->SetPort(GetNextPort()); 652 if (bindings_->find(*addr) == bindings_->end()) { 653 break; 654 } 655 } 656 } 657 658 return Bind(socket, *addr); 659 } 660 661 VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) { 662 SocketAddress normalized(addr.ipaddr().Normalized(), 663 addr.port()); 664 AddressMap::iterator it = bindings_->find(normalized); 665 if (it != bindings_->end()) { 666 return it->second; 667 } 668 669 IPAddress default_ip = GetDefaultRoute(addr.ipaddr().family()); 670 if (!IPIsUnspec(default_ip) && addr.ipaddr() == default_ip) { 671 // If we can't find a binding for the packet which is sent to the interface 672 // corresponding to the default route, it should match a binding with the 673 // correct port to the any address. 674 SocketAddress sock_addr = 675 EmptySocketAddressWithFamily(addr.ipaddr().family()); 676 sock_addr.SetPort(addr.port()); 677 return LookupBinding(sock_addr); 678 } 679 680 return nullptr; 681 } 682 683 int VirtualSocketServer::Unbind(const SocketAddress& addr, 684 VirtualSocket* socket) { 685 SocketAddress normalized(addr.ipaddr().Normalized(), 686 addr.port()); 687 ASSERT((*bindings_)[normalized] == socket); 688 bindings_->erase(bindings_->find(normalized)); 689 return 0; 690 } 691 692 void VirtualSocketServer::AddConnection(const SocketAddress& local, 693 const SocketAddress& remote, 694 VirtualSocket* remote_socket) { 695 // Add this socket pair to our routing table. This will allow 696 // multiple clients to connect to the same server address. 697 SocketAddress local_normalized(local.ipaddr().Normalized(), 698 local.port()); 699 SocketAddress remote_normalized(remote.ipaddr().Normalized(), 700 remote.port()); 701 SocketAddressPair address_pair(local_normalized, remote_normalized); 702 connections_->insert(std::pair<SocketAddressPair, 703 VirtualSocket*>(address_pair, remote_socket)); 704 } 705 706 VirtualSocket* VirtualSocketServer::LookupConnection( 707 const SocketAddress& local, 708 const SocketAddress& remote) { 709 SocketAddress local_normalized(local.ipaddr().Normalized(), 710 local.port()); 711 SocketAddress remote_normalized(remote.ipaddr().Normalized(), 712 remote.port()); 713 SocketAddressPair address_pair(local_normalized, remote_normalized); 714 ConnectionMap::iterator it = connections_->find(address_pair); 715 return (connections_->end() != it) ? it->second : NULL; 716 } 717 718 void VirtualSocketServer::RemoveConnection(const SocketAddress& local, 719 const SocketAddress& remote) { 720 SocketAddress local_normalized(local.ipaddr().Normalized(), 721 local.port()); 722 SocketAddress remote_normalized(remote.ipaddr().Normalized(), 723 remote.port()); 724 SocketAddressPair address_pair(local_normalized, remote_normalized); 725 connections_->erase(address_pair); 726 } 727 728 static double Random() { 729 return static_cast<double>(rand()) / RAND_MAX; 730 } 731 732 int VirtualSocketServer::Connect(VirtualSocket* socket, 733 const SocketAddress& remote_addr, 734 bool use_delay) { 735 uint32_t delay = use_delay ? GetRandomTransitDelay() : 0; 736 VirtualSocket* remote = LookupBinding(remote_addr); 737 if (!CanInteractWith(socket, remote)) { 738 LOG(LS_INFO) << "Address family mismatch between " 739 << socket->GetLocalAddress() << " and " << remote_addr; 740 return -1; 741 } 742 if (remote != NULL) { 743 SocketAddress addr = socket->GetLocalAddress(); 744 msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT, 745 new MessageAddress(addr)); 746 } else { 747 LOG(LS_INFO) << "No one listening at " << remote_addr; 748 msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT); 749 } 750 return 0; 751 } 752 753 bool VirtualSocketServer::Disconnect(VirtualSocket* socket) { 754 if (socket) { 755 // Remove the mapping. 756 msg_queue_->Post(socket, MSG_ID_DISCONNECT); 757 return true; 758 } 759 return false; 760 } 761 762 int VirtualSocketServer::SendUdp(VirtualSocket* socket, 763 const char* data, size_t data_size, 764 const SocketAddress& remote_addr) { 765 // See if we want to drop this packet. 766 if (Random() < drop_prob_) { 767 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; 768 return static_cast<int>(data_size); 769 } 770 771 VirtualSocket* recipient = LookupBinding(remote_addr); 772 if (!recipient) { 773 // Make a fake recipient for address family checking. 774 scoped_ptr<VirtualSocket> dummy_socket( 775 CreateSocketInternal(AF_INET, SOCK_DGRAM)); 776 dummy_socket->SetLocalAddress(remote_addr); 777 if (!CanInteractWith(socket, dummy_socket.get())) { 778 LOG(LS_VERBOSE) << "Incompatible address families: " 779 << socket->GetLocalAddress() << " and " << remote_addr; 780 return -1; 781 } 782 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; 783 return static_cast<int>(data_size); 784 } 785 786 if (!CanInteractWith(socket, recipient)) { 787 LOG(LS_VERBOSE) << "Incompatible address families: " 788 << socket->GetLocalAddress() << " and " << remote_addr; 789 return -1; 790 } 791 792 CritScope cs(&socket->crit_); 793 794 uint32_t cur_time = Time(); 795 PurgeNetworkPackets(socket, cur_time); 796 797 // Determine whether we have enough bandwidth to accept this packet. To do 798 // this, we need to update the send queue. Once we know it's current size, 799 // we know whether we can fit this packet. 800 // 801 // NOTE: There are better algorithms for maintaining such a queue (such as 802 // "Derivative Random Drop"); however, this algorithm is a more accurate 803 // simulation of what a normal network would do. 804 805 size_t packet_size = data_size + UDP_HEADER_SIZE; 806 if (socket->network_size_ + packet_size > network_capacity_) { 807 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; 808 return static_cast<int>(data_size); 809 } 810 811 AddPacketToNetwork(socket, recipient, cur_time, data, data_size, 812 UDP_HEADER_SIZE, false); 813 814 return static_cast<int>(data_size); 815 } 816 817 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { 818 // TCP can't send more data than will fill up the receiver's buffer. 819 // We track the data that is in the buffer plus data in flight using the 820 // recipient's recv_buffer_size_. Anything beyond that must be stored in the 821 // sender's buffer. We will trigger the buffered data to be sent when data 822 // is read from the recv_buffer. 823 824 // Lookup the local/remote pair in the connections table. 825 VirtualSocket* recipient = LookupConnection(socket->local_addr_, 826 socket->remote_addr_); 827 if (!recipient) { 828 LOG(LS_VERBOSE) << "Sending data to no one."; 829 return; 830 } 831 832 CritScope cs(&socket->crit_); 833 834 uint32_t cur_time = Time(); 835 PurgeNetworkPackets(socket, cur_time); 836 837 while (true) { 838 size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_; 839 size_t max_data_size = 840 std::min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE); 841 size_t data_size = std::min(socket->send_buffer_.size(), max_data_size); 842 if (0 == data_size) 843 break; 844 845 AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0], 846 data_size, TCP_HEADER_SIZE, true); 847 recipient->recv_buffer_size_ += data_size; 848 849 size_t new_buffer_size = socket->send_buffer_.size() - data_size; 850 // Avoid undefined access beyond the last element of the vector. 851 // This only happens when new_buffer_size is 0. 852 if (data_size < socket->send_buffer_.size()) { 853 // memmove is required for potentially overlapping source/destination. 854 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], 855 new_buffer_size); 856 } 857 socket->send_buffer_.resize(new_buffer_size); 858 } 859 860 if (socket->write_enabled_ 861 && (socket->send_buffer_.size() < send_buffer_capacity_)) { 862 socket->write_enabled_ = false; 863 socket->SignalWriteEvent(socket); 864 } 865 } 866 867 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, 868 VirtualSocket* recipient, 869 uint32_t cur_time, 870 const char* data, 871 size_t data_size, 872 size_t header_size, 873 bool ordered) { 874 VirtualSocket::NetworkEntry entry; 875 entry.size = data_size + header_size; 876 877 sender->network_size_ += entry.size; 878 uint32_t send_delay = SendDelay(static_cast<uint32_t>(sender->network_size_)); 879 entry.done_time = cur_time + send_delay; 880 sender->network_.push_back(entry); 881 882 // Find the delay for crossing the many virtual hops of the network. 883 uint32_t transit_delay = GetRandomTransitDelay(); 884 885 // When the incoming packet is from a binding of the any address, translate it 886 // to the default route here such that the recipient will see the default 887 // route. 888 SocketAddress sender_addr = sender->local_addr_; 889 IPAddress default_ip = GetDefaultRoute(sender_addr.ipaddr().family()); 890 if (sender_addr.IsAnyIP() && !IPIsUnspec(default_ip)) { 891 sender_addr.SetIP(default_ip); 892 } 893 894 // Post the packet as a message to be delivered (on our own thread) 895 Packet* p = new Packet(data, data_size, sender_addr); 896 897 uint32_t ts = TimeAfter(send_delay + transit_delay); 898 if (ordered) { 899 // Ensure that new packets arrive after previous ones 900 // TODO: consider ordering on a per-socket basis, since this 901 // introduces artifical delay. 902 ts = TimeMax(ts, network_delay_); 903 } 904 msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p); 905 network_delay_ = TimeMax(ts, network_delay_); 906 } 907 908 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, 909 uint32_t cur_time) { 910 while (!socket->network_.empty() && 911 (socket->network_.front().done_time <= cur_time)) { 912 ASSERT(socket->network_size_ >= socket->network_.front().size); 913 socket->network_size_ -= socket->network_.front().size; 914 socket->network_.pop_front(); 915 } 916 } 917 918 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { 919 if (bandwidth_ == 0) 920 return 0; 921 else 922 return 1000 * size / bandwidth_; 923 } 924 925 #if 0 926 void PrintFunction(std::vector<std::pair<double, double> >* f) { 927 return; 928 double sum = 0; 929 for (uint32_t i = 0; i < f->size(); ++i) { 930 std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl; 931 sum += (*f)[i].second; 932 } 933 if (!f->empty()) { 934 const double mean = sum / f->size(); 935 double sum_sq_dev = 0; 936 for (uint32_t i = 0; i < f->size(); ++i) { 937 double dev = (*f)[i].second - mean; 938 sum_sq_dev += dev * dev; 939 } 940 std::cout << "Mean = " << mean << " StdDev = " 941 << sqrt(sum_sq_dev / f->size()) << std::endl; 942 } 943 } 944 #endif // <unused> 945 946 void VirtualSocketServer::UpdateDelayDistribution() { 947 Function* dist = CreateDistribution(delay_mean_, delay_stddev_, 948 delay_samples_); 949 // We take a lock just to make sure we don't leak memory. 950 { 951 CritScope cs(&delay_crit_); 952 delete delay_dist_; 953 delay_dist_ = dist; 954 } 955 } 956 957 static double PI = 4 * atan(1.0); 958 959 static double Normal(double x, double mean, double stddev) { 960 double a = (x - mean) * (x - mean) / (2 * stddev * stddev); 961 return exp(-a) / (stddev * sqrt(2 * PI)); 962 } 963 964 #if 0 // static unused gives a warning 965 static double Pareto(double x, double min, double k) { 966 if (x < min) 967 return 0; 968 else 969 return k * std::pow(min, k) / std::pow(x, k+1); 970 } 971 #endif 972 973 VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution( 974 uint32_t mean, 975 uint32_t stddev, 976 uint32_t samples) { 977 Function* f = new Function(); 978 979 if (0 == stddev) { 980 f->push_back(Point(mean, 1.0)); 981 } else { 982 double start = 0; 983 if (mean >= 4 * static_cast<double>(stddev)) 984 start = mean - 4 * static_cast<double>(stddev); 985 double end = mean + 4 * static_cast<double>(stddev); 986 987 for (uint32_t i = 0; i < samples; i++) { 988 double x = start + (end - start) * i / (samples - 1); 989 double y = Normal(x, mean, stddev); 990 f->push_back(Point(x, y)); 991 } 992 } 993 return Resample(Invert(Accumulate(f)), 0, 1, samples); 994 } 995 996 uint32_t VirtualSocketServer::GetRandomTransitDelay() { 997 size_t index = rand() % delay_dist_->size(); 998 double delay = (*delay_dist_)[index].second; 999 //LOG_F(LS_INFO) << "random[" << index << "] = " << delay; 1000 return static_cast<uint32_t>(delay); 1001 } 1002 1003 struct FunctionDomainCmp { 1004 bool operator()(const VirtualSocketServer::Point& p1, 1005 const VirtualSocketServer::Point& p2) { 1006 return p1.first < p2.first; 1007 } 1008 bool operator()(double v1, const VirtualSocketServer::Point& p2) { 1009 return v1 < p2.first; 1010 } 1011 bool operator()(const VirtualSocketServer::Point& p1, double v2) { 1012 return p1.first < v2; 1013 } 1014 }; 1015 1016 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { 1017 ASSERT(f->size() >= 1); 1018 double v = 0; 1019 for (Function::size_type i = 0; i < f->size() - 1; ++i) { 1020 double dx = (*f)[i + 1].first - (*f)[i].first; 1021 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; 1022 (*f)[i].second = v; 1023 v = v + dx * avgy; 1024 } 1025 (*f)[f->size()-1].second = v; 1026 return f; 1027 } 1028 1029 VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) { 1030 for (Function::size_type i = 0; i < f->size(); ++i) 1031 std::swap((*f)[i].first, (*f)[i].second); 1032 1033 std::sort(f->begin(), f->end(), FunctionDomainCmp()); 1034 return f; 1035 } 1036 1037 VirtualSocketServer::Function* VirtualSocketServer::Resample(Function* f, 1038 double x1, 1039 double x2, 1040 uint32_t samples) { 1041 Function* g = new Function(); 1042 1043 for (size_t i = 0; i < samples; i++) { 1044 double x = x1 + (x2 - x1) * i / (samples - 1); 1045 double y = Evaluate(f, x); 1046 g->push_back(Point(x, y)); 1047 } 1048 1049 delete f; 1050 return g; 1051 } 1052 1053 double VirtualSocketServer::Evaluate(Function* f, double x) { 1054 Function::iterator iter = 1055 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); 1056 if (iter == f->begin()) { 1057 return (*f)[0].second; 1058 } else if (iter == f->end()) { 1059 ASSERT(f->size() >= 1); 1060 return (*f)[f->size() - 1].second; 1061 } else if (iter->first == x) { 1062 return iter->second; 1063 } else { 1064 double x1 = (iter - 1)->first; 1065 double y1 = (iter - 1)->second; 1066 double x2 = iter->first; 1067 double y2 = iter->second; 1068 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); 1069 } 1070 } 1071 1072 bool VirtualSocketServer::CanInteractWith(VirtualSocket* local, 1073 VirtualSocket* remote) { 1074 if (!local || !remote) { 1075 return false; 1076 } 1077 IPAddress local_ip = local->GetLocalAddress().ipaddr(); 1078 IPAddress remote_ip = remote->GetLocalAddress().ipaddr(); 1079 IPAddress local_normalized = local_ip.Normalized(); 1080 IPAddress remote_normalized = remote_ip.Normalized(); 1081 // Check if the addresses are the same family after Normalization (turns 1082 // mapped IPv6 address into IPv4 addresses). 1083 // This will stop unmapped V6 addresses from talking to mapped V6 addresses. 1084 if (local_normalized.family() == remote_normalized.family()) { 1085 return true; 1086 } 1087 1088 // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY. 1089 int remote_v6_only = 0; 1090 remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only); 1091 if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) { 1092 return true; 1093 } 1094 // Same check, backwards. 1095 int local_v6_only = 0; 1096 local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only); 1097 if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) { 1098 return true; 1099 } 1100 1101 // Check to see if either socket was explicitly bound to IPv6-any. 1102 // These sockets can talk with anyone. 1103 if (local_ip.family() == AF_INET6 && local->was_any()) { 1104 return true; 1105 } 1106 if (remote_ip.family() == AF_INET6 && remote->was_any()) { 1107 return true; 1108 } 1109 1110 return false; 1111 } 1112 1113 IPAddress VirtualSocketServer::GetDefaultRoute(int family) { 1114 if (family == AF_INET) { 1115 return default_route_v4_; 1116 } 1117 if (family == AF_INET6) { 1118 return default_route_v6_; 1119 } 1120 return IPAddress(); 1121 } 1122 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { 1123 RTC_DCHECK(!IPIsAny(from_addr)); 1124 if (from_addr.family() == AF_INET) { 1125 default_route_v4_ = from_addr; 1126 } else if (from_addr.family() == AF_INET6) { 1127 default_route_v6_ = from_addr; 1128 } 1129 } 1130 1131 } // namespace rtc 1132