1 /* 2 * Copyright 2006 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include <math.h> 12 #include <time.h> 13 #if defined(WEBRTC_POSIX) 14 #include <netinet/in.h> 15 #endif 16 17 #include "webrtc/base/logging.h" 18 #include "webrtc/base/gunit.h" 19 #include "webrtc/base/testclient.h" 20 #include "webrtc/base/testutils.h" 21 #include "webrtc/base/thread.h" 22 #include "webrtc/base/timeutils.h" 23 #include "webrtc/base/virtualsocketserver.h" 24 25 using namespace rtc; 26 27 // Sends at a constant rate but with random packet sizes. 28 struct Sender : public MessageHandler { 29 Sender(Thread* th, AsyncSocket* s, uint32 rt) 30 : thread(th), socket(new AsyncUDPSocket(s)), 31 done(false), rate(rt), count(0) { 32 last_send = rtc::Time(); 33 thread->PostDelayed(NextDelay(), this, 1); 34 } 35 36 uint32 NextDelay() { 37 uint32 size = (rand() % 4096) + 1; 38 return 1000 * size / rate; 39 } 40 41 void OnMessage(Message* pmsg) { 42 ASSERT_EQ(1u, pmsg->message_id); 43 44 if (done) 45 return; 46 47 uint32 cur_time = rtc::Time(); 48 uint32 delay = cur_time - last_send; 49 uint32 size = rate * delay / 1000; 50 size = std::min<uint32>(size, 4096); 51 size = std::max<uint32>(size, sizeof(uint32)); 52 53 count += size; 54 memcpy(dummy, &cur_time, sizeof(cur_time)); 55 socket->Send(dummy, size, options); 56 57 last_send = cur_time; 58 thread->PostDelayed(NextDelay(), this, 1); 59 } 60 61 Thread* thread; 62 scoped_ptr<AsyncUDPSocket> socket; 63 rtc::PacketOptions options; 64 bool done; 65 uint32 rate; // bytes per second 66 uint32 count; 67 uint32 last_send; 68 char dummy[4096]; 69 }; 70 71 struct Receiver : public MessageHandler, public sigslot::has_slots<> { 72 Receiver(Thread* th, AsyncSocket* s, uint32 bw) 73 : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false), 74 count(0), sec_count(0), sum(0), sum_sq(0), samples(0) { 75 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket); 76 thread->PostDelayed(1000, this, 1); 77 } 78 79 ~Receiver() { 80 thread->Clear(this); 81 } 82 83 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size, 84 const SocketAddress& remote_addr, 85 const PacketTime& packet_time) { 86 ASSERT_EQ(socket.get(), s); 87 ASSERT_GE(size, 4U); 88 89 count += size; 90 sec_count += size; 91 92 uint32 send_time = *reinterpret_cast<const uint32*>(data); 93 uint32 recv_time = rtc::Time(); 94 uint32 delay = recv_time - send_time; 95 sum += delay; 96 sum_sq += delay * delay; 97 samples += 1; 98 } 99 100 void OnMessage(Message* pmsg) { 101 ASSERT_EQ(1u, pmsg->message_id); 102 103 if (done) 104 return; 105 106 // It is always possible for us to receive more than expected because 107 // packets can be further delayed in delivery. 108 if (bandwidth > 0) 109 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4); 110 sec_count = 0; 111 thread->PostDelayed(1000, this, 1); 112 } 113 114 Thread* thread; 115 scoped_ptr<AsyncUDPSocket> socket; 116 uint32 bandwidth; 117 bool done; 118 size_t count; 119 size_t sec_count; 120 double sum; 121 double sum_sq; 122 uint32 samples; 123 }; 124 125 class VirtualSocketServerTest : public testing::Test { 126 public: 127 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)), 128 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0), 129 kIPv6AnyAddress(IPAddress(in6addr_any), 0) { 130 } 131 132 void CheckAddressIncrementalization(const SocketAddress& post, 133 const SocketAddress& pre) { 134 EXPECT_EQ(post.port(), pre.port() + 1); 135 IPAddress post_ip = post.ipaddr(); 136 IPAddress pre_ip = pre.ipaddr(); 137 EXPECT_EQ(pre_ip.family(), post_ip.family()); 138 if (post_ip.family() == AF_INET) { 139 in_addr pre_ipv4 = pre_ip.ipv4_address(); 140 in_addr post_ipv4 = post_ip.ipv4_address(); 141 int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr); 142 EXPECT_EQ(1, difference); 143 } else if (post_ip.family() == AF_INET6) { 144 in6_addr post_ip6 = post_ip.ipv6_address(); 145 in6_addr pre_ip6 = pre_ip.ipv6_address(); 146 uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr); 147 uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr); 148 EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1); 149 } 150 } 151 152 void BasicTest(const SocketAddress& initial_addr) { 153 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(), 154 SOCK_DGRAM); 155 socket->Bind(initial_addr); 156 SocketAddress server_addr = socket->GetLocalAddress(); 157 // Make sure VSS didn't switch families on us. 158 EXPECT_EQ(server_addr.family(), initial_addr.family()); 159 160 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 161 AsyncSocket* socket2 = 162 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 163 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 164 165 SocketAddress client2_addr; 166 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 167 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 168 169 SocketAddress client1_addr; 170 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 171 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 172 EXPECT_EQ(client1_addr, server_addr); 173 174 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family()); 175 for (int i = 0; i < 10; i++) { 176 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty)); 177 178 SocketAddress next_client2_addr; 179 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 180 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr)); 181 CheckAddressIncrementalization(next_client2_addr, client2_addr); 182 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1); 183 184 SocketAddress server_addr2; 185 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr)); 186 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2)); 187 EXPECT_EQ(server_addr2, server_addr); 188 189 client2_addr = next_client2_addr; 190 } 191 } 192 193 // initial_addr should be made from either INADDR_ANY or in6addr_any. 194 void ConnectTest(const SocketAddress& initial_addr) { 195 testing::StreamSink sink; 196 SocketAddress accept_addr; 197 const SocketAddress kEmptyAddr = 198 EmptySocketAddressWithFamily(initial_addr.family()); 199 200 // Create client 201 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 202 SOCK_STREAM); 203 sink.Monitor(client); 204 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 205 EXPECT_TRUE(client->GetLocalAddress().IsNil()); 206 207 // Create server 208 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 209 SOCK_STREAM); 210 sink.Monitor(server); 211 EXPECT_NE(0, server->Listen(5)); // Bind required 212 EXPECT_EQ(0, server->Bind(initial_addr)); 213 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 214 EXPECT_EQ(0, server->Listen(5)); 215 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING); 216 217 // No pending server connections 218 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 219 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 220 EXPECT_EQ(AF_UNSPEC, accept_addr.family()); 221 222 // Attempt connect to listening socket 223 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 224 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind 225 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind 226 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress()); 227 228 // Client is connecting 229 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 230 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 231 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 232 233 ss_->ProcessMessagesUntilIdle(); 234 235 // Client still connecting 236 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 237 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 238 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 239 240 // Server has pending connection 241 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 242 Socket* accepted = server->Accept(&accept_addr); 243 EXPECT_TRUE(NULL != accepted); 244 EXPECT_NE(accept_addr, kEmptyAddr); 245 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr); 246 247 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 248 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress()); 249 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 250 251 ss_->ProcessMessagesUntilIdle(); 252 253 // Client has connected 254 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED); 255 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 256 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 257 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 258 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 259 } 260 261 void ConnectToNonListenerTest(const SocketAddress& initial_addr) { 262 testing::StreamSink sink; 263 SocketAddress accept_addr; 264 const SocketAddress nil_addr; 265 const SocketAddress empty_addr = 266 EmptySocketAddressWithFamily(initial_addr.family()); 267 268 // Create client 269 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 270 SOCK_STREAM); 271 sink.Monitor(client); 272 273 // Create server 274 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 275 SOCK_STREAM); 276 sink.Monitor(server); 277 EXPECT_EQ(0, server->Bind(initial_addr)); 278 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 279 // Attempt connect to non-listening socket 280 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 281 282 ss_->ProcessMessagesUntilIdle(); 283 284 // No pending server connections 285 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 286 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 287 EXPECT_EQ(accept_addr, nil_addr); 288 289 // Connection failed 290 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 291 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 292 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 293 EXPECT_EQ(client->GetRemoteAddress(), nil_addr); 294 } 295 296 void CloseDuringConnectTest(const SocketAddress& initial_addr) { 297 testing::StreamSink sink; 298 SocketAddress accept_addr; 299 const SocketAddress empty_addr = 300 EmptySocketAddressWithFamily(initial_addr.family()); 301 302 // Create client and server 303 scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(), 304 SOCK_STREAM)); 305 sink.Monitor(client.get()); 306 scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(), 307 SOCK_STREAM)); 308 sink.Monitor(server.get()); 309 310 // Initiate connect 311 EXPECT_EQ(0, server->Bind(initial_addr)); 312 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 313 314 EXPECT_EQ(0, server->Listen(5)); 315 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 316 317 // Server close before socket enters accept queue 318 EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ)); 319 server->Close(); 320 321 ss_->ProcessMessagesUntilIdle(); 322 323 // Result: connection failed 324 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 325 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 326 327 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 328 sink.Monitor(server.get()); 329 330 // Initiate connect 331 EXPECT_EQ(0, server->Bind(initial_addr)); 332 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 333 334 EXPECT_EQ(0, server->Listen(5)); 335 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 336 337 ss_->ProcessMessagesUntilIdle(); 338 339 // Server close while socket is in accept queue 340 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 341 server->Close(); 342 343 ss_->ProcessMessagesUntilIdle(); 344 345 // Result: connection failed 346 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 347 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 348 349 // New server 350 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 351 sink.Monitor(server.get()); 352 353 // Initiate connect 354 EXPECT_EQ(0, server->Bind(initial_addr)); 355 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 356 357 EXPECT_EQ(0, server->Listen(5)); 358 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 359 360 ss_->ProcessMessagesUntilIdle(); 361 362 // Server accepts connection 363 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 364 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr)); 365 ASSERT_TRUE(NULL != accepted.get()); 366 sink.Monitor(accepted.get()); 367 368 // Client closes before connection complets 369 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 370 371 // Connected message has not been processed yet. 372 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 373 client->Close(); 374 375 ss_->ProcessMessagesUntilIdle(); 376 377 // Result: accepted socket closes 378 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED); 379 EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE)); 380 EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE)); 381 } 382 383 void CloseTest(const SocketAddress& initial_addr) { 384 testing::StreamSink sink; 385 const SocketAddress kEmptyAddr; 386 387 // Create clients 388 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 389 sink.Monitor(a); 390 a->Bind(initial_addr); 391 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 392 393 394 scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(), 395 SOCK_STREAM)); 396 sink.Monitor(b.get()); 397 b->Bind(initial_addr); 398 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 399 400 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 401 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 402 403 ss_->ProcessMessagesUntilIdle(); 404 405 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN)); 406 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED); 407 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress()); 408 409 EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN)); 410 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED); 411 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress()); 412 413 EXPECT_EQ(1, a->Send("a", 1)); 414 b->Close(); 415 EXPECT_EQ(1, a->Send("b", 1)); 416 417 ss_->ProcessMessagesUntilIdle(); 418 419 char buffer[10]; 420 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ)); 421 EXPECT_EQ(-1, b->Recv(buffer, 10)); 422 423 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE)); 424 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED); 425 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr); 426 427 // No signal for Closer 428 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE)); 429 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED); 430 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr); 431 } 432 433 void TcpSendTest(const SocketAddress& initial_addr) { 434 testing::StreamSink sink; 435 const SocketAddress kEmptyAddr; 436 437 // Connect two sockets 438 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 439 sink.Monitor(a); 440 a->Bind(initial_addr); 441 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 442 443 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 444 sink.Monitor(b); 445 b->Bind(initial_addr); 446 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 447 448 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 449 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 450 451 ss_->ProcessMessagesUntilIdle(); 452 453 const size_t kBufferSize = 2000; 454 ss_->set_send_buffer_capacity(kBufferSize); 455 ss_->set_recv_buffer_capacity(kBufferSize); 456 457 const size_t kDataSize = 5000; 458 char send_buffer[kDataSize], recv_buffer[kDataSize]; 459 for (size_t i = 0; i < kDataSize; ++i) 460 send_buffer[i] = static_cast<char>(i % 256); 461 memset(recv_buffer, 0, sizeof(recv_buffer)); 462 size_t send_pos = 0, recv_pos = 0; 463 464 // Can't send more than send buffer in one write 465 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 466 EXPECT_EQ(static_cast<int>(kBufferSize), result); 467 send_pos += result; 468 469 ss_->ProcessMessagesUntilIdle(); 470 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 471 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 472 473 // Receive buffer is already filled, fill send buffer again 474 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 475 EXPECT_EQ(static_cast<int>(kBufferSize), result); 476 send_pos += result; 477 478 ss_->ProcessMessagesUntilIdle(); 479 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 480 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 481 482 // No more room in send or receive buffer 483 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 484 EXPECT_EQ(-1, result); 485 EXPECT_TRUE(a->IsBlocking()); 486 487 // Read a subset of the data 488 result = b->Recv(recv_buffer + recv_pos, 500); 489 EXPECT_EQ(500, result); 490 recv_pos += result; 491 492 ss_->ProcessMessagesUntilIdle(); 493 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE)); 494 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 495 496 // Room for more on the sending side 497 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 498 EXPECT_EQ(500, result); 499 send_pos += result; 500 501 // Empty the recv buffer 502 while (true) { 503 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 504 if (result < 0) { 505 EXPECT_EQ(-1, result); 506 EXPECT_TRUE(b->IsBlocking()); 507 break; 508 } 509 recv_pos += result; 510 } 511 512 ss_->ProcessMessagesUntilIdle(); 513 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 514 515 // Continue to empty the recv buffer 516 while (true) { 517 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 518 if (result < 0) { 519 EXPECT_EQ(-1, result); 520 EXPECT_TRUE(b->IsBlocking()); 521 break; 522 } 523 recv_pos += result; 524 } 525 526 // Send last of the data 527 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 528 EXPECT_EQ(500, result); 529 send_pos += result; 530 531 ss_->ProcessMessagesUntilIdle(); 532 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 533 534 // Receive the last of the data 535 while (true) { 536 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 537 if (result < 0) { 538 EXPECT_EQ(-1, result); 539 EXPECT_TRUE(b->IsBlocking()); 540 break; 541 } 542 recv_pos += result; 543 } 544 545 ss_->ProcessMessagesUntilIdle(); 546 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 547 548 // The received data matches the sent data 549 EXPECT_EQ(kDataSize, send_pos); 550 EXPECT_EQ(kDataSize, recv_pos); 551 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize)); 552 } 553 554 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) { 555 const SocketAddress kEmptyAddr; 556 557 // Connect two sockets 558 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), 559 SOCK_STREAM); 560 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), 561 SOCK_STREAM); 562 a->Bind(initial_addr); 563 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 564 565 b->Bind(initial_addr); 566 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 567 568 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 569 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 570 ss_->ProcessMessagesUntilIdle(); 571 572 // First, deliver all packets in 0 ms. 573 char buffer[2] = { 0, 0 }; 574 const char cNumPackets = 10; 575 for (char i = 0; i < cNumPackets; ++i) { 576 buffer[0] = '0' + i; 577 EXPECT_EQ(1, a->Send(buffer, 1)); 578 } 579 580 ss_->ProcessMessagesUntilIdle(); 581 582 for (char i = 0; i < cNumPackets; ++i) { 583 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 584 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]); 585 } 586 587 // Next, deliver packets at random intervals 588 const uint32 mean = 50; 589 const uint32 stddev = 50; 590 591 ss_->set_delay_mean(mean); 592 ss_->set_delay_stddev(stddev); 593 ss_->UpdateDelayDistribution(); 594 595 for (char i = 0; i < cNumPackets; ++i) { 596 buffer[0] = 'A' + i; 597 EXPECT_EQ(1, a->Send(buffer, 1)); 598 } 599 600 ss_->ProcessMessagesUntilIdle(); 601 602 for (char i = 0; i < cNumPackets; ++i) { 603 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 604 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]); 605 } 606 } 607 608 void BandwidthTest(const SocketAddress& initial_addr) { 609 AsyncSocket* send_socket = 610 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 611 AsyncSocket* recv_socket = 612 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 613 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 614 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 615 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 616 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 617 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 618 619 uint32 bandwidth = 64 * 1024; 620 ss_->set_bandwidth(bandwidth); 621 622 Thread* pthMain = Thread::Current(); 623 Sender sender(pthMain, send_socket, 80 * 1024); 624 Receiver receiver(pthMain, recv_socket, bandwidth); 625 626 pthMain->ProcessMessages(5000); 627 sender.done = true; 628 pthMain->ProcessMessages(5000); 629 630 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4); 631 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s 632 633 ss_->set_bandwidth(0); 634 } 635 636 void DelayTest(const SocketAddress& initial_addr) { 637 time_t seed = ::time(NULL); 638 LOG(LS_VERBOSE) << "seed = " << seed; 639 srand(static_cast<unsigned int>(seed)); 640 641 const uint32 mean = 2000; 642 const uint32 stddev = 500; 643 644 ss_->set_delay_mean(mean); 645 ss_->set_delay_stddev(stddev); 646 ss_->UpdateDelayDistribution(); 647 648 AsyncSocket* send_socket = 649 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 650 AsyncSocket* recv_socket = 651 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 652 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 653 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 654 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 655 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 656 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 657 658 Thread* pthMain = Thread::Current(); 659 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about 660 // 1000 packets, which is necessary to get a good distribution. 661 Sender sender(pthMain, send_socket, 100 * 2 * 1024); 662 Receiver receiver(pthMain, recv_socket, 0); 663 664 pthMain->ProcessMessages(10000); 665 sender.done = receiver.done = true; 666 ss_->ProcessMessagesUntilIdle(); 667 668 const double sample_mean = receiver.sum / receiver.samples; 669 double num = 670 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum; 671 double den = receiver.samples * (receiver.samples - 1); 672 const double sample_stddev = sqrt(num / den); 673 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev; 674 675 EXPECT_LE(500u, receiver.samples); 676 // We initially used a 0.1 fudge factor, but on the build machine, we 677 // have seen the value differ by as much as 0.13. 678 EXPECT_NEAR(mean, sample_mean, 0.15 * mean); 679 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev); 680 681 ss_->set_delay_mean(0); 682 ss_->set_delay_stddev(0); 683 ss_->UpdateDelayDistribution(); 684 } 685 686 // Test cross-family communication between a client bound to client_addr and a 687 // server bound to server_addr. shouldSucceed indicates if communication is 688 // expected to work or not. 689 void CrossFamilyConnectionTest(const SocketAddress& client_addr, 690 const SocketAddress& server_addr, 691 bool shouldSucceed) { 692 testing::StreamSink sink; 693 SocketAddress accept_address; 694 const SocketAddress kEmptyAddr; 695 696 // Client gets a IPv4 address 697 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(), 698 SOCK_STREAM); 699 sink.Monitor(client); 700 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 701 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr); 702 client->Bind(client_addr); 703 704 // Server gets a non-mapped non-any IPv6 address. 705 // IPv4 sockets should not be able to connect to this. 706 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(), 707 SOCK_STREAM); 708 sink.Monitor(server); 709 server->Bind(server_addr); 710 server->Listen(5); 711 712 if (shouldSucceed) { 713 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 714 ss_->ProcessMessagesUntilIdle(); 715 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 716 Socket* accepted = server->Accept(&accept_address); 717 EXPECT_TRUE(NULL != accepted); 718 EXPECT_NE(kEmptyAddr, accept_address); 719 ss_->ProcessMessagesUntilIdle(); 720 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 721 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 722 } else { 723 // Check that the connection failed. 724 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress())); 725 ss_->ProcessMessagesUntilIdle(); 726 727 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 728 EXPECT_TRUE(NULL == server->Accept(&accept_address)); 729 EXPECT_EQ(accept_address, kEmptyAddr); 730 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 731 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 732 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr); 733 } 734 } 735 736 // Test cross-family datagram sending between a client bound to client_addr 737 // and a server bound to server_addr. shouldSucceed indicates if sending is 738 // expected to succed or not. 739 void CrossFamilyDatagramTest(const SocketAddress& client_addr, 740 const SocketAddress& server_addr, 741 bool shouldSucceed) { 742 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM); 743 socket->Bind(server_addr); 744 SocketAddress bound_server_addr = socket->GetLocalAddress(); 745 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 746 747 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM); 748 socket2->Bind(client_addr); 749 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 750 SocketAddress client2_addr; 751 752 if (shouldSucceed) { 753 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr)); 754 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 755 SocketAddress client1_addr; 756 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 757 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 758 EXPECT_EQ(client1_addr, bound_server_addr); 759 } else { 760 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr)); 761 EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0)); 762 } 763 } 764 765 protected: 766 virtual void SetUp() { 767 Thread::Current()->set_socketserver(ss_); 768 } 769 virtual void TearDown() { 770 Thread::Current()->set_socketserver(NULL); 771 } 772 773 VirtualSocketServer* ss_; 774 const SocketAddress kIPv4AnyAddress; 775 const SocketAddress kIPv6AnyAddress; 776 }; 777 778 TEST_F(VirtualSocketServerTest, basic_v4) { 779 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000); 780 BasicTest(ipv4_test_addr); 781 } 782 783 TEST_F(VirtualSocketServerTest, basic_v6) { 784 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000); 785 BasicTest(ipv6_test_addr); 786 } 787 788 TEST_F(VirtualSocketServerTest, connect_v4) { 789 ConnectTest(kIPv4AnyAddress); 790 } 791 792 TEST_F(VirtualSocketServerTest, connect_v6) { 793 ConnectTest(kIPv6AnyAddress); 794 } 795 796 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) { 797 ConnectToNonListenerTest(kIPv4AnyAddress); 798 } 799 800 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) { 801 ConnectToNonListenerTest(kIPv6AnyAddress); 802 } 803 804 TEST_F(VirtualSocketServerTest, close_during_connect_v4) { 805 CloseDuringConnectTest(kIPv4AnyAddress); 806 } 807 808 TEST_F(VirtualSocketServerTest, close_during_connect_v6) { 809 CloseDuringConnectTest(kIPv6AnyAddress); 810 } 811 812 TEST_F(VirtualSocketServerTest, close_v4) { 813 CloseTest(kIPv4AnyAddress); 814 } 815 816 TEST_F(VirtualSocketServerTest, close_v6) { 817 CloseTest(kIPv6AnyAddress); 818 } 819 820 TEST_F(VirtualSocketServerTest, tcp_send_v4) { 821 TcpSendTest(kIPv4AnyAddress); 822 } 823 824 TEST_F(VirtualSocketServerTest, tcp_send_v6) { 825 TcpSendTest(kIPv6AnyAddress); 826 } 827 828 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) { 829 TcpSendsPacketsInOrderTest(kIPv4AnyAddress); 830 } 831 832 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) { 833 TcpSendsPacketsInOrderTest(kIPv6AnyAddress); 834 } 835 836 TEST_F(VirtualSocketServerTest, bandwidth_v4) { 837 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 838 BandwidthTest(ipv4_test_addr); 839 } 840 841 TEST_F(VirtualSocketServerTest, bandwidth_v6) { 842 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 843 BandwidthTest(ipv6_test_addr); 844 } 845 846 TEST_F(VirtualSocketServerTest, delay_v4) { 847 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 848 DelayTest(ipv4_test_addr); 849 } 850 851 // See: https://code.google.com/p/webrtc/issues/detail?id=2409 852 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) { 853 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 854 DelayTest(ipv6_test_addr); 855 } 856 857 // Works, receiving socket sees 127.0.0.2. 858 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) { 859 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0), 860 SocketAddress("0.0.0.0", 5000), 861 true); 862 } 863 864 // Fails. 865 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) { 866 CrossFamilyConnectionTest(SocketAddress("::2", 0), 867 SocketAddress("0.0.0.0", 5000), 868 false); 869 } 870 871 // Fails. 872 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) { 873 CrossFamilyConnectionTest(SocketAddress("::2", 0), 874 SocketAddress("::ffff:127.0.0.1", 5000), 875 false); 876 } 877 878 // Works. receiving socket sees ::ffff:127.0.0.2. 879 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) { 880 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 881 SocketAddress("::", 5000), 882 true); 883 } 884 885 // Fails. 886 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) { 887 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 888 SocketAddress("::1", 5000), 889 false); 890 } 891 892 // Works. Receiving socket sees ::ffff:127.0.0.1. 893 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) { 894 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0), 895 SocketAddress("::ffff:127.0.0.2", 5000), 896 true); 897 } 898 899 // Works, receiving socket sees a result from GetNextIP. 900 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) { 901 CrossFamilyConnectionTest(SocketAddress("::", 0), 902 SocketAddress("0.0.0.0", 5000), 903 true); 904 } 905 906 // Works, receiving socket sees whatever GetNextIP gave the client. 907 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) { 908 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0), 909 SocketAddress("::", 5000), 910 true); 911 } 912 913 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) { 914 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0), 915 SocketAddress("::", 5000), 916 true); 917 } 918 919 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) { 920 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0), 921 SocketAddress("0.0.0.0", 5000), 922 true); 923 } 924 925 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) { 926 CrossFamilyDatagramTest(SocketAddress("::2", 0), 927 SocketAddress("0.0.0.0", 5000), 928 false); 929 } 930 931 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) { 932 CrossFamilyDatagramTest(SocketAddress("::2", 0), 933 SocketAddress("::ffff:127.0.0.1", 5000), 934 false); 935 } 936 937 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) { 938 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 939 SocketAddress("::", 5000), 940 true); 941 } 942 943 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) { 944 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 945 SocketAddress("::1", 5000), 946 false); 947 } 948 949 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) { 950 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0), 951 SocketAddress("::ffff:127.0.0.2", 5000), 952 true); 953 } 954 955 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) { 956 CrossFamilyDatagramTest(SocketAddress("::", 0), 957 SocketAddress("0.0.0.0", 5000), 958 true); 959 } 960 961 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) { 962 const uint32 kTestMean[] = { 10, 100, 333, 1000 }; 963 const double kTestDev[] = { 0.25, 0.1, 0.01 }; 964 // TODO: The current code only works for 1000 data points or more. 965 const uint32 kTestSamples[] = { /*10, 100,*/ 1000 }; 966 for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) { 967 for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) { 968 for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) { 969 ASSERT_LT(0u, kTestSamples[sidx]); 970 const uint32 kStdDev = 971 static_cast<uint32>(kTestDev[didx] * kTestMean[midx]); 972 VirtualSocketServer::Function* f = 973 VirtualSocketServer::CreateDistribution(kTestMean[midx], 974 kStdDev, 975 kTestSamples[sidx]); 976 ASSERT_TRUE(NULL != f); 977 ASSERT_EQ(kTestSamples[sidx], f->size()); 978 double sum = 0; 979 for (uint32 i = 0; i < f->size(); ++i) { 980 sum += (*f)[i].second; 981 } 982 const double mean = sum / f->size(); 983 double sum_sq_dev = 0; 984 for (uint32 i = 0; i < f->size(); ++i) { 985 double dev = (*f)[i].second - mean; 986 sum_sq_dev += dev * dev; 987 } 988 const double stddev = sqrt(sum_sq_dev / f->size()); 989 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx]) 990 << "M=" << kTestMean[midx] 991 << " SD=" << kStdDev 992 << " N=" << kTestSamples[sidx]; 993 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev) 994 << "M=" << kTestMean[midx] 995 << " SD=" << kStdDev 996 << " N=" << kTestSamples[sidx]; 997 delete f; 998 } 999 } 1000 } 1001 } 1002