1 /* 2 * libjingle 3 * Copyright 2006, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <time.h> 29 #ifdef POSIX 30 #include <netinet/in.h> 31 #endif 32 #include <cmath> 33 34 #include "talk/base/logging.h" 35 #include "talk/base/gunit.h" 36 #include "talk/base/testclient.h" 37 #include "talk/base/testutils.h" 38 #include "talk/base/thread.h" 39 #include "talk/base/timeutils.h" 40 #include "talk/base/virtualsocketserver.h" 41 42 using namespace talk_base; 43 44 // Sends at a constant rate but with random packet sizes. 45 struct Sender : public MessageHandler { 46 Sender(Thread* th, AsyncSocket* s, uint32 rt) 47 : thread(th), socket(new AsyncUDPSocket(s)), 48 done(false), rate(rt), count(0) { 49 last_send = Time(); 50 thread->PostDelayed(NextDelay(), this, 1); 51 } 52 53 uint32 NextDelay() { 54 uint32 size = (rand() % 4096) + 1; 55 return 1000 * size / rate; 56 } 57 58 void OnMessage(Message* pmsg) { 59 ASSERT_EQ(1u, pmsg->message_id); 60 61 if (done) 62 return; 63 64 uint32 cur_time = Time(); 65 uint32 delay = cur_time - last_send; 66 uint32 size = rate * delay / 1000; 67 size = std::min<uint32>(size, 4096); 68 size = std::max<uint32>(size, sizeof(uint32)); 69 70 count += size; 71 memcpy(dummy, &cur_time, sizeof(cur_time)); 72 socket->Send(dummy, size); 73 74 last_send = cur_time; 75 thread->PostDelayed(NextDelay(), this, 1); 76 } 77 78 Thread* thread; 79 scoped_ptr<AsyncUDPSocket> socket; 80 bool done; 81 uint32 rate; // bytes per second 82 uint32 count; 83 uint32 last_send; 84 char dummy[4096]; 85 }; 86 87 struct Receiver : public MessageHandler, public sigslot::has_slots<> { 88 Receiver(Thread* th, AsyncSocket* s, uint32 bw) 89 : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false), 90 count(0), sec_count(0), sum(0), sum_sq(0), samples(0) { 91 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket); 92 thread->PostDelayed(1000, this, 1); 93 } 94 95 ~Receiver() { 96 thread->Clear(this); 97 } 98 99 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size, 100 const SocketAddress& remote_addr) { 101 ASSERT_EQ(socket.get(), s); 102 ASSERT_GE(size, 4U); 103 104 count += size; 105 sec_count += size; 106 107 uint32 send_time = *reinterpret_cast<const uint32*>(data); 108 uint32 recv_time = Time(); 109 uint32 delay = recv_time - send_time; 110 sum += delay; 111 sum_sq += delay * delay; 112 samples += 1; 113 } 114 115 void OnMessage(Message* pmsg) { 116 ASSERT_EQ(1u, pmsg->message_id); 117 118 if (done) 119 return; 120 121 // It is always possible for us to receive more than expected because 122 // packets can be further delayed in delivery. 123 if (bandwidth > 0) 124 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4); 125 sec_count = 0; 126 thread->PostDelayed(1000, this, 1); 127 } 128 129 Thread* thread; 130 scoped_ptr<AsyncUDPSocket> socket; 131 uint32 bandwidth; 132 bool done; 133 size_t count; 134 size_t sec_count; 135 double sum; 136 double sum_sq; 137 uint32 samples; 138 }; 139 140 class VirtualSocketServerTest : public testing::Test { 141 public: 142 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)), 143 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0), 144 kIPv6AnyAddress(IPAddress(in6addr_any), 0) { 145 } 146 147 void CheckAddressIncrementalization(const SocketAddress& post, 148 const SocketAddress& pre) { 149 EXPECT_EQ(post.port(), pre.port() + 1); 150 IPAddress post_ip = post.ipaddr(); 151 IPAddress pre_ip = pre.ipaddr(); 152 EXPECT_EQ(pre_ip.family(), post_ip.family()); 153 if (post_ip.family() == AF_INET) { 154 in_addr pre_ipv4 = pre_ip.ipv4_address(); 155 in_addr post_ipv4 = post_ip.ipv4_address(); 156 int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr); 157 EXPECT_EQ(1, difference); 158 } else if (post_ip.family() == AF_INET6) { 159 in6_addr post_ip6 = post_ip.ipv6_address(); 160 in6_addr pre_ip6 = pre_ip.ipv6_address(); 161 uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr); 162 uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr); 163 EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1); 164 } 165 } 166 167 void BasicTest(const SocketAddress& initial_addr) { 168 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(), 169 SOCK_DGRAM); 170 socket->Bind(initial_addr); 171 SocketAddress server_addr = socket->GetLocalAddress(); 172 // Make sure VSS didn't switch families on us. 173 EXPECT_EQ(server_addr.family(), initial_addr.family()); 174 175 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 176 AsyncSocket* socket2 = 177 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 178 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 179 180 SocketAddress client2_addr; 181 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 182 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 183 184 SocketAddress client1_addr; 185 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 186 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 187 EXPECT_EQ(client1_addr, server_addr); 188 189 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family()); 190 for (int i = 0; i < 10; i++) { 191 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty)); 192 193 SocketAddress next_client2_addr; 194 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 195 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr)); 196 CheckAddressIncrementalization(next_client2_addr, client2_addr); 197 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1); 198 199 SocketAddress server_addr2; 200 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr)); 201 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2)); 202 EXPECT_EQ(server_addr2, server_addr); 203 204 client2_addr = next_client2_addr; 205 } 206 } 207 208 // initial_addr should be made from either INADDR_ANY or in6addr_any. 209 void ConnectTest(const SocketAddress& initial_addr) { 210 testing::StreamSink sink; 211 SocketAddress accept_addr; 212 const SocketAddress kEmptyAddr = 213 EmptySocketAddressWithFamily(initial_addr.family()); 214 215 // Create client 216 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 217 SOCK_STREAM); 218 sink.Monitor(client); 219 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 220 EXPECT_TRUE(client->GetLocalAddress().IsNil()); 221 222 // Create server 223 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 224 SOCK_STREAM); 225 sink.Monitor(server); 226 EXPECT_NE(0, server->Listen(5)); // Bind required 227 EXPECT_EQ(0, server->Bind(initial_addr)); 228 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 229 EXPECT_EQ(0, server->Listen(5)); 230 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING); 231 232 // No pending server connections 233 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 234 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 235 EXPECT_EQ(AF_UNSPEC, accept_addr.family()); 236 237 // Attempt connect to listening socket 238 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 239 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind 240 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind 241 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress()); 242 243 // Client is connecting 244 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 245 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 246 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 247 248 ss_->ProcessMessagesUntilIdle(); 249 250 // Client still connecting 251 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 252 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 253 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 254 255 // Server has pending connection 256 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 257 Socket* accepted = server->Accept(&accept_addr); 258 EXPECT_TRUE(NULL != accepted); 259 EXPECT_NE(accept_addr, kEmptyAddr); 260 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr); 261 262 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 263 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress()); 264 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 265 266 ss_->ProcessMessagesUntilIdle(); 267 268 // Client has connected 269 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED); 270 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 271 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 272 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 273 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 274 } 275 276 void ConnectToNonListenerTest(const SocketAddress& initial_addr) { 277 testing::StreamSink sink; 278 SocketAddress accept_addr; 279 const SocketAddress nil_addr; 280 const SocketAddress empty_addr = 281 EmptySocketAddressWithFamily(initial_addr.family()); 282 283 // Create client 284 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 285 SOCK_STREAM); 286 sink.Monitor(client); 287 288 // Create server 289 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 290 SOCK_STREAM); 291 sink.Monitor(server); 292 EXPECT_EQ(0, server->Bind(initial_addr)); 293 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 294 // Attempt connect to non-listening socket 295 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 296 297 ss_->ProcessMessagesUntilIdle(); 298 299 // No pending server connections 300 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 301 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 302 EXPECT_EQ(accept_addr, nil_addr); 303 304 // Connection failed 305 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 306 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 307 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 308 EXPECT_EQ(client->GetRemoteAddress(), nil_addr); 309 } 310 311 void CloseDuringConnectTest(const SocketAddress& initial_addr) { 312 testing::StreamSink sink; 313 SocketAddress accept_addr; 314 const SocketAddress empty_addr = 315 EmptySocketAddressWithFamily(initial_addr.family()); 316 317 // Create client and server 318 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 319 SOCK_STREAM); 320 sink.Monitor(client); 321 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 322 SOCK_STREAM); 323 sink.Monitor(server); 324 325 // Initiate connect 326 EXPECT_EQ(0, server->Bind(initial_addr)); 327 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 328 329 EXPECT_EQ(0, server->Listen(5)); 330 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 331 332 // Server close before socket enters accept queue 333 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 334 server->Close(); 335 336 ss_->ProcessMessagesUntilIdle(); 337 338 // Result: connection failed 339 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 340 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 341 342 // New server 343 delete server; 344 server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 345 sink.Monitor(server); 346 347 // Initiate connect 348 EXPECT_EQ(0, server->Bind(initial_addr)); 349 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 350 351 EXPECT_EQ(0, server->Listen(5)); 352 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 353 354 ss_->ProcessMessagesUntilIdle(); 355 356 // Server close while socket is in accept queue 357 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 358 server->Close(); 359 360 ss_->ProcessMessagesUntilIdle(); 361 362 // Result: connection failed 363 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 364 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 365 366 // New server 367 delete server; 368 server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 369 sink.Monitor(server); 370 371 // Initiate connect 372 EXPECT_EQ(0, server->Bind(initial_addr)); 373 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 374 375 EXPECT_EQ(0, server->Listen(5)); 376 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 377 378 ss_->ProcessMessagesUntilIdle(); 379 380 // Server accepts connection 381 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 382 AsyncSocket* accepted = server->Accept(&accept_addr); 383 ASSERT_TRUE(NULL != accepted); 384 sink.Monitor(accepted); 385 386 // Client closes before connection complets 387 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 388 389 // Connected message has not been processed yet. 390 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 391 client->Close(); 392 393 ss_->ProcessMessagesUntilIdle(); 394 395 // Result: accepted socket closes 396 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED); 397 EXPECT_TRUE(sink.Check(accepted, testing::SSE_CLOSE)); 398 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 399 } 400 401 void CloseTest(const SocketAddress& initial_addr) { 402 testing::StreamSink sink; 403 const SocketAddress kEmptyAddr; 404 405 // Create clients 406 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 407 sink.Monitor(a); 408 a->Bind(initial_addr); 409 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 410 411 412 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 413 sink.Monitor(b); 414 b->Bind(initial_addr); 415 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 416 417 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 418 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 419 420 ss_->ProcessMessagesUntilIdle(); 421 422 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN)); 423 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED); 424 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress()); 425 426 EXPECT_TRUE(sink.Check(b, testing::SSE_OPEN)); 427 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED); 428 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress()); 429 430 EXPECT_EQ(1, a->Send("a", 1)); 431 b->Close(); 432 EXPECT_EQ(1, a->Send("b", 1)); 433 434 ss_->ProcessMessagesUntilIdle(); 435 436 char buffer[10]; 437 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 438 EXPECT_EQ(-1, b->Recv(buffer, 10)); 439 440 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE)); 441 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED); 442 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr); 443 444 EXPECT_FALSE(sink.Check(b, testing::SSE_CLOSE)); // No signal for Closer 445 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED); 446 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr); 447 } 448 449 void TcpSendTest(const SocketAddress& initial_addr) { 450 testing::StreamSink sink; 451 const SocketAddress kEmptyAddr; 452 453 // Connect two sockets 454 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 455 sink.Monitor(a); 456 a->Bind(initial_addr); 457 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 458 459 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 460 sink.Monitor(b); 461 b->Bind(initial_addr); 462 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 463 464 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 465 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 466 467 ss_->ProcessMessagesUntilIdle(); 468 469 const size_t kBufferSize = 2000; 470 ss_->set_send_buffer_capacity(kBufferSize); 471 ss_->set_recv_buffer_capacity(kBufferSize); 472 473 const size_t kDataSize = 5000; 474 char send_buffer[kDataSize], recv_buffer[kDataSize]; 475 for (size_t i = 0; i < kDataSize; ++i) 476 send_buffer[i] = static_cast<char>(i % 256); 477 memset(recv_buffer, 0, sizeof(recv_buffer)); 478 size_t send_pos = 0, recv_pos = 0; 479 480 // Can't send more than send buffer in one write 481 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 482 EXPECT_EQ(static_cast<int>(kBufferSize), result); 483 send_pos += result; 484 485 ss_->ProcessMessagesUntilIdle(); 486 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 487 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 488 489 // Receive buffer is already filled, fill send buffer again 490 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 491 EXPECT_EQ(static_cast<int>(kBufferSize), result); 492 send_pos += result; 493 494 ss_->ProcessMessagesUntilIdle(); 495 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 496 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 497 498 // No more room in send or receive buffer 499 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 500 EXPECT_EQ(-1, result); 501 EXPECT_TRUE(a->IsBlocking()); 502 503 // Read a subset of the data 504 result = b->Recv(recv_buffer + recv_pos, 500); 505 EXPECT_EQ(500, result); 506 recv_pos += result; 507 508 ss_->ProcessMessagesUntilIdle(); 509 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE)); 510 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 511 512 // Room for more on the sending side 513 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 514 EXPECT_EQ(500, result); 515 send_pos += result; 516 517 // Empty the recv buffer 518 while (true) { 519 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 520 if (result < 0) { 521 EXPECT_EQ(-1, result); 522 EXPECT_TRUE(b->IsBlocking()); 523 break; 524 } 525 recv_pos += result; 526 } 527 528 ss_->ProcessMessagesUntilIdle(); 529 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 530 531 // Continue to empty the recv buffer 532 while (true) { 533 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 534 if (result < 0) { 535 EXPECT_EQ(-1, result); 536 EXPECT_TRUE(b->IsBlocking()); 537 break; 538 } 539 recv_pos += result; 540 } 541 542 // Send last of the data 543 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 544 EXPECT_EQ(500, result); 545 send_pos += result; 546 547 ss_->ProcessMessagesUntilIdle(); 548 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 549 550 // Receive the last of the data 551 while (true) { 552 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 553 if (result < 0) { 554 EXPECT_EQ(-1, result); 555 EXPECT_TRUE(b->IsBlocking()); 556 break; 557 } 558 recv_pos += result; 559 } 560 561 ss_->ProcessMessagesUntilIdle(); 562 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 563 564 // The received data matches the sent data 565 EXPECT_EQ(kDataSize, send_pos); 566 EXPECT_EQ(kDataSize, recv_pos); 567 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize)); 568 } 569 570 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) { 571 const SocketAddress kEmptyAddr; 572 573 // Connect two sockets 574 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), 575 SOCK_STREAM); 576 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), 577 SOCK_STREAM); 578 a->Bind(initial_addr); 579 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 580 581 b->Bind(initial_addr); 582 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 583 584 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 585 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 586 ss_->ProcessMessagesUntilIdle(); 587 588 // First, deliver all packets in 0 ms. 589 char buffer[2] = { 0, 0 }; 590 const char cNumPackets = 10; 591 for (char i = 0; i < cNumPackets; ++i) { 592 buffer[0] = '0' + i; 593 EXPECT_EQ(1, a->Send(buffer, 1)); 594 } 595 596 ss_->ProcessMessagesUntilIdle(); 597 598 for (char i = 0; i < cNumPackets; ++i) { 599 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 600 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]); 601 } 602 603 // Next, deliver packets at random intervals 604 const uint32 mean = 50; 605 const uint32 stddev = 50; 606 607 ss_->set_delay_mean(mean); 608 ss_->set_delay_stddev(stddev); 609 ss_->UpdateDelayDistribution(); 610 611 for (char i = 0; i < cNumPackets; ++i) { 612 buffer[0] = 'A' + i; 613 EXPECT_EQ(1, a->Send(buffer, 1)); 614 } 615 616 ss_->ProcessMessagesUntilIdle(); 617 618 for (char i = 0; i < cNumPackets; ++i) { 619 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 620 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]); 621 } 622 } 623 624 void BandwidthTest(const SocketAddress& initial_addr) { 625 AsyncSocket* send_socket = 626 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 627 AsyncSocket* recv_socket = 628 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 629 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 630 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 631 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 632 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 633 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 634 635 uint32 bandwidth = 64 * 1024; 636 ss_->set_bandwidth(bandwidth); 637 638 Thread* pthMain = Thread::Current(); 639 Sender sender(pthMain, send_socket, 80 * 1024); 640 Receiver receiver(pthMain, recv_socket, bandwidth); 641 642 pthMain->ProcessMessages(5000); 643 sender.done = true; 644 pthMain->ProcessMessages(5000); 645 646 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4); 647 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s 648 649 ss_->set_bandwidth(0); 650 } 651 652 void DelayTest(const SocketAddress& initial_addr) { 653 time_t seed = ::time(NULL); 654 LOG(LS_VERBOSE) << "seed = " << seed; 655 srand(static_cast<unsigned int>(seed)); 656 657 const uint32 mean = 2000; 658 const uint32 stddev = 500; 659 660 ss_->set_delay_mean(mean); 661 ss_->set_delay_stddev(stddev); 662 ss_->UpdateDelayDistribution(); 663 664 AsyncSocket* send_socket = 665 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 666 AsyncSocket* recv_socket = 667 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 668 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 669 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 670 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 671 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 672 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 673 674 Thread* pthMain = Thread::Current(); 675 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about 676 // 1000 packets, which is necessary to get a good distribution. 677 Sender sender(pthMain, send_socket, 100 * 2 * 1024); 678 Receiver receiver(pthMain, recv_socket, 0); 679 680 pthMain->ProcessMessages(10000); 681 sender.done = receiver.done = true; 682 ss_->ProcessMessagesUntilIdle(); 683 684 const double sample_mean = receiver.sum / receiver.samples; 685 double num = 686 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum; 687 double den = receiver.samples * (receiver.samples - 1); 688 const double sample_stddev = std::sqrt(num / den); 689 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev; 690 691 EXPECT_LE(500u, receiver.samples); 692 // We initially used a 0.1 fudge factor, but on the build machine, we 693 // have seen the value differ by as much as 0.13. 694 EXPECT_NEAR(mean, sample_mean, 0.15 * mean); 695 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev); 696 697 ss_->set_delay_mean(0); 698 ss_->set_delay_stddev(0); 699 ss_->UpdateDelayDistribution(); 700 } 701 702 // Test cross-family communication between a client bound to client_addr and a 703 // server bound to server_addr. shouldSucceed indicates if communication is 704 // expected to work or not. 705 void CrossFamilyConnectionTest(const SocketAddress& client_addr, 706 const SocketAddress& server_addr, 707 bool shouldSucceed) { 708 testing::StreamSink sink; 709 SocketAddress accept_address; 710 const SocketAddress kEmptyAddr; 711 712 // Client gets a IPv4 address 713 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(), 714 SOCK_STREAM); 715 sink.Monitor(client); 716 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 717 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr); 718 client->Bind(client_addr); 719 720 // Server gets a non-mapped non-any IPv6 address. 721 // IPv4 sockets should not be able to connect to this. 722 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(), 723 SOCK_STREAM); 724 sink.Monitor(server); 725 server->Bind(server_addr); 726 server->Listen(5); 727 728 if (shouldSucceed) { 729 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 730 ss_->ProcessMessagesUntilIdle(); 731 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 732 Socket* accepted = server->Accept(&accept_address); 733 EXPECT_TRUE(NULL != accepted); 734 EXPECT_NE(kEmptyAddr, accept_address); 735 ss_->ProcessMessagesUntilIdle(); 736 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 737 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 738 } else { 739 // Check that the connection failed. 740 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress())); 741 ss_->ProcessMessagesUntilIdle(); 742 743 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 744 EXPECT_TRUE(NULL == server->Accept(&accept_address)); 745 EXPECT_EQ(accept_address, kEmptyAddr); 746 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 747 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 748 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr); 749 } 750 } 751 752 // Test cross-family datagram sending between a client bound to client_addr 753 // and a server bound to server_addr. shouldSucceed indicates if sending is 754 // expected to succed or not. 755 void CrossFamilyDatagramTest(const SocketAddress& client_addr, 756 const SocketAddress& server_addr, 757 bool shouldSucceed) { 758 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM); 759 socket->Bind(server_addr); 760 SocketAddress bound_server_addr = socket->GetLocalAddress(); 761 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 762 763 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM); 764 socket2->Bind(client_addr); 765 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 766 SocketAddress client2_addr; 767 768 if (shouldSucceed) { 769 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr)); 770 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 771 SocketAddress client1_addr; 772 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 773 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 774 EXPECT_EQ(client1_addr, bound_server_addr); 775 } else { 776 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr)); 777 EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0)); 778 } 779 } 780 781 protected: 782 virtual void SetUp() { 783 Thread::Current()->set_socketserver(ss_); 784 } 785 virtual void TearDown() { 786 Thread::Current()->set_socketserver(NULL); 787 } 788 789 VirtualSocketServer* ss_; 790 const SocketAddress kIPv4AnyAddress; 791 const SocketAddress kIPv6AnyAddress; 792 }; 793 794 TEST_F(VirtualSocketServerTest, basic_v4) { 795 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000); 796 BasicTest(ipv4_test_addr); 797 } 798 799 TEST_F(VirtualSocketServerTest, basic_v6) { 800 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000); 801 BasicTest(ipv6_test_addr); 802 } 803 804 TEST_F(VirtualSocketServerTest, connect_v4) { 805 ConnectTest(kIPv4AnyAddress); 806 } 807 808 TEST_F(VirtualSocketServerTest, connect_v6) { 809 ConnectTest(kIPv6AnyAddress); 810 } 811 812 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) { 813 ConnectToNonListenerTest(kIPv4AnyAddress); 814 } 815 816 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) { 817 ConnectToNonListenerTest(kIPv6AnyAddress); 818 } 819 820 TEST_F(VirtualSocketServerTest, close_during_connect_v4) { 821 CloseDuringConnectTest(kIPv4AnyAddress); 822 } 823 824 TEST_F(VirtualSocketServerTest, close_during_connect_v6) { 825 CloseDuringConnectTest(kIPv6AnyAddress); 826 } 827 828 TEST_F(VirtualSocketServerTest, close_v4) { 829 CloseTest(kIPv4AnyAddress); 830 } 831 832 TEST_F(VirtualSocketServerTest, close_v6) { 833 CloseTest(kIPv6AnyAddress); 834 } 835 836 TEST_F(VirtualSocketServerTest, tcp_send_v4) { 837 TcpSendTest(kIPv4AnyAddress); 838 } 839 840 TEST_F(VirtualSocketServerTest, tcp_send_v6) { 841 TcpSendTest(kIPv6AnyAddress); 842 } 843 844 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) { 845 TcpSendsPacketsInOrderTest(kIPv4AnyAddress); 846 } 847 848 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) { 849 TcpSendsPacketsInOrderTest(kIPv6AnyAddress); 850 } 851 852 TEST_F(VirtualSocketServerTest, bandwidth_v4) { 853 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 854 BandwidthTest(ipv4_test_addr); 855 } 856 857 TEST_F(VirtualSocketServerTest, bandwidth_v6) { 858 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 859 BandwidthTest(ipv6_test_addr); 860 } 861 862 TEST_F(VirtualSocketServerTest, delay_v4) { 863 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 864 DelayTest(ipv4_test_addr); 865 } 866 867 TEST_F(VirtualSocketServerTest, delay_v6) { 868 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 869 DelayTest(ipv6_test_addr); 870 } 871 872 // Works, receiving socket sees 127.0.0.2. 873 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) { 874 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0), 875 SocketAddress("0.0.0.0", 5000), 876 true); 877 } 878 879 // Fails. 880 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) { 881 CrossFamilyConnectionTest(SocketAddress("::2", 0), 882 SocketAddress("0.0.0.0", 5000), 883 false); 884 } 885 886 // Fails. 887 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) { 888 CrossFamilyConnectionTest(SocketAddress("::2", 0), 889 SocketAddress("::ffff:127.0.0.1", 5000), 890 false); 891 } 892 893 // Works. receiving socket sees ::ffff:127.0.0.2. 894 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) { 895 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 896 SocketAddress("::", 5000), 897 true); 898 } 899 900 // Fails. 901 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) { 902 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 903 SocketAddress("::1", 5000), 904 false); 905 } 906 907 // Works. Receiving socket sees ::ffff:127.0.0.1. 908 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) { 909 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0), 910 SocketAddress("::ffff:127.0.0.2", 5000), 911 true); 912 } 913 914 // Works, receiving socket sees a result from GetNextIP. 915 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) { 916 CrossFamilyConnectionTest(SocketAddress("::", 0), 917 SocketAddress("0.0.0.0", 5000), 918 true); 919 } 920 921 // Works, receiving socket sees whatever GetNextIP gave the client. 922 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) { 923 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0), 924 SocketAddress("::", 5000), 925 true); 926 } 927 928 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) { 929 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0), 930 SocketAddress("::", 5000), 931 true); 932 } 933 934 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) { 935 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0), 936 SocketAddress("0.0.0.0", 5000), 937 true); 938 } 939 940 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) { 941 CrossFamilyDatagramTest(SocketAddress("::2", 0), 942 SocketAddress("0.0.0.0", 5000), 943 false); 944 } 945 946 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) { 947 CrossFamilyDatagramTest(SocketAddress("::2", 0), 948 SocketAddress("::ffff:127.0.0.1", 5000), 949 false); 950 } 951 952 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) { 953 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 954 SocketAddress("::", 5000), 955 true); 956 } 957 958 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) { 959 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 960 SocketAddress("::1", 5000), 961 false); 962 } 963 964 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) { 965 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0), 966 SocketAddress("::ffff:127.0.0.2", 5000), 967 true); 968 } 969 970 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) { 971 CrossFamilyDatagramTest(SocketAddress("::", 0), 972 SocketAddress("0.0.0.0", 5000), 973 true); 974 } 975 976 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) { 977 const uint32 kTestMean[] = { 10, 100, 333, 1000 }; 978 const double kTestDev[] = { 0.25, 0.1, 0.01 }; 979 // TODO: The current code only works for 1000 data points or more. 980 const uint32 kTestSamples[] = { /*10, 100,*/ 1000 }; 981 for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) { 982 for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) { 983 for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) { 984 ASSERT_LT(0u, kTestSamples[sidx]); 985 const uint32 kStdDev = 986 static_cast<uint32>(kTestDev[didx] * kTestMean[midx]); 987 VirtualSocketServer::Function* f = 988 VirtualSocketServer::CreateDistribution(kTestMean[midx], 989 kStdDev, 990 kTestSamples[sidx]); 991 ASSERT_TRUE(NULL != f); 992 ASSERT_EQ(kTestSamples[sidx], f->size()); 993 double sum = 0; 994 for (uint32 i = 0; i < f->size(); ++i) { 995 sum += (*f)[i].second; 996 } 997 const double mean = sum / f->size(); 998 double sum_sq_dev = 0; 999 for (uint32 i = 0; i < f->size(); ++i) { 1000 double dev = (*f)[i].second - mean; 1001 sum_sq_dev += dev * dev; 1002 } 1003 const double stddev = std::sqrt(sum_sq_dev / f->size()); 1004 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx]) 1005 << "M=" << kTestMean[midx] 1006 << " SD=" << kStdDev 1007 << " N=" << kTestSamples[sidx]; 1008 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev) 1009 << "M=" << kTestMean[midx] 1010 << " SD=" << kStdDev 1011 << " N=" << kTestSamples[sidx]; 1012 delete f; 1013 } 1014 } 1015 } 1016 } 1017