1 /* 2 * libjingle 3 * Copyright 2006, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <math.h> 29 #include <time.h> 30 #ifdef POSIX 31 #include <netinet/in.h> 32 #endif 33 34 #include "talk/base/logging.h" 35 #include "talk/base/gunit.h" 36 #include "talk/base/testclient.h" 37 #include "talk/base/testutils.h" 38 #include "talk/base/thread.h" 39 #include "talk/base/timeutils.h" 40 #include "talk/base/virtualsocketserver.h" 41 42 using namespace talk_base; 43 44 // Sends at a constant rate but with random packet sizes. 45 struct Sender : public MessageHandler { 46 Sender(Thread* th, AsyncSocket* s, uint32 rt) 47 : thread(th), socket(new AsyncUDPSocket(s)), 48 done(false), rate(rt), count(0) { 49 last_send = talk_base::Time(); 50 thread->PostDelayed(NextDelay(), this, 1); 51 } 52 53 uint32 NextDelay() { 54 uint32 size = (rand() % 4096) + 1; 55 return 1000 * size / rate; 56 } 57 58 void OnMessage(Message* pmsg) { 59 ASSERT_EQ(1u, pmsg->message_id); 60 61 if (done) 62 return; 63 64 uint32 cur_time = talk_base::Time(); 65 uint32 delay = cur_time - last_send; 66 uint32 size = rate * delay / 1000; 67 size = std::min<uint32>(size, 4096); 68 size = std::max<uint32>(size, sizeof(uint32)); 69 70 count += size; 71 memcpy(dummy, &cur_time, sizeof(cur_time)); 72 socket->Send(dummy, size, options); 73 74 last_send = cur_time; 75 thread->PostDelayed(NextDelay(), this, 1); 76 } 77 78 Thread* thread; 79 scoped_ptr<AsyncUDPSocket> socket; 80 talk_base::PacketOptions options; 81 bool done; 82 uint32 rate; // bytes per second 83 uint32 count; 84 uint32 last_send; 85 char dummy[4096]; 86 }; 87 88 struct Receiver : public MessageHandler, public sigslot::has_slots<> { 89 Receiver(Thread* th, AsyncSocket* s, uint32 bw) 90 : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false), 91 count(0), sec_count(0), sum(0), sum_sq(0), samples(0) { 92 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket); 93 thread->PostDelayed(1000, this, 1); 94 } 95 96 ~Receiver() { 97 thread->Clear(this); 98 } 99 100 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size, 101 const SocketAddress& remote_addr, 102 const PacketTime& packet_time) { 103 ASSERT_EQ(socket.get(), s); 104 ASSERT_GE(size, 4U); 105 106 count += size; 107 sec_count += size; 108 109 uint32 send_time = *reinterpret_cast<const uint32*>(data); 110 uint32 recv_time = talk_base::Time(); 111 uint32 delay = recv_time - send_time; 112 sum += delay; 113 sum_sq += delay * delay; 114 samples += 1; 115 } 116 117 void OnMessage(Message* pmsg) { 118 ASSERT_EQ(1u, pmsg->message_id); 119 120 if (done) 121 return; 122 123 // It is always possible for us to receive more than expected because 124 // packets can be further delayed in delivery. 125 if (bandwidth > 0) 126 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4); 127 sec_count = 0; 128 thread->PostDelayed(1000, this, 1); 129 } 130 131 Thread* thread; 132 scoped_ptr<AsyncUDPSocket> socket; 133 uint32 bandwidth; 134 bool done; 135 size_t count; 136 size_t sec_count; 137 double sum; 138 double sum_sq; 139 uint32 samples; 140 }; 141 142 class VirtualSocketServerTest : public testing::Test { 143 public: 144 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)), 145 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0), 146 kIPv6AnyAddress(IPAddress(in6addr_any), 0) { 147 } 148 149 void CheckAddressIncrementalization(const SocketAddress& post, 150 const SocketAddress& pre) { 151 EXPECT_EQ(post.port(), pre.port() + 1); 152 IPAddress post_ip = post.ipaddr(); 153 IPAddress pre_ip = pre.ipaddr(); 154 EXPECT_EQ(pre_ip.family(), post_ip.family()); 155 if (post_ip.family() == AF_INET) { 156 in_addr pre_ipv4 = pre_ip.ipv4_address(); 157 in_addr post_ipv4 = post_ip.ipv4_address(); 158 int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr); 159 EXPECT_EQ(1, difference); 160 } else if (post_ip.family() == AF_INET6) { 161 in6_addr post_ip6 = post_ip.ipv6_address(); 162 in6_addr pre_ip6 = pre_ip.ipv6_address(); 163 uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr); 164 uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr); 165 EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1); 166 } 167 } 168 169 void BasicTest(const SocketAddress& initial_addr) { 170 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(), 171 SOCK_DGRAM); 172 socket->Bind(initial_addr); 173 SocketAddress server_addr = socket->GetLocalAddress(); 174 // Make sure VSS didn't switch families on us. 175 EXPECT_EQ(server_addr.family(), initial_addr.family()); 176 177 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 178 AsyncSocket* socket2 = 179 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 180 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 181 182 SocketAddress client2_addr; 183 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 184 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 185 186 SocketAddress client1_addr; 187 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 188 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 189 EXPECT_EQ(client1_addr, server_addr); 190 191 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family()); 192 for (int i = 0; i < 10; i++) { 193 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty)); 194 195 SocketAddress next_client2_addr; 196 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 197 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr)); 198 CheckAddressIncrementalization(next_client2_addr, client2_addr); 199 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1); 200 201 SocketAddress server_addr2; 202 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr)); 203 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2)); 204 EXPECT_EQ(server_addr2, server_addr); 205 206 client2_addr = next_client2_addr; 207 } 208 } 209 210 // initial_addr should be made from either INADDR_ANY or in6addr_any. 211 void ConnectTest(const SocketAddress& initial_addr) { 212 testing::StreamSink sink; 213 SocketAddress accept_addr; 214 const SocketAddress kEmptyAddr = 215 EmptySocketAddressWithFamily(initial_addr.family()); 216 217 // Create client 218 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 219 SOCK_STREAM); 220 sink.Monitor(client); 221 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 222 EXPECT_TRUE(client->GetLocalAddress().IsNil()); 223 224 // Create server 225 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 226 SOCK_STREAM); 227 sink.Monitor(server); 228 EXPECT_NE(0, server->Listen(5)); // Bind required 229 EXPECT_EQ(0, server->Bind(initial_addr)); 230 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 231 EXPECT_EQ(0, server->Listen(5)); 232 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING); 233 234 // No pending server connections 235 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 236 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 237 EXPECT_EQ(AF_UNSPEC, accept_addr.family()); 238 239 // Attempt connect to listening socket 240 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 241 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind 242 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind 243 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress()); 244 245 // Client is connecting 246 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 247 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 248 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 249 250 ss_->ProcessMessagesUntilIdle(); 251 252 // Client still connecting 253 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 254 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 255 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 256 257 // Server has pending connection 258 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 259 Socket* accepted = server->Accept(&accept_addr); 260 EXPECT_TRUE(NULL != accepted); 261 EXPECT_NE(accept_addr, kEmptyAddr); 262 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr); 263 264 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 265 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress()); 266 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 267 268 ss_->ProcessMessagesUntilIdle(); 269 270 // Client has connected 271 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED); 272 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 273 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 274 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 275 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 276 } 277 278 void ConnectToNonListenerTest(const SocketAddress& initial_addr) { 279 testing::StreamSink sink; 280 SocketAddress accept_addr; 281 const SocketAddress nil_addr; 282 const SocketAddress empty_addr = 283 EmptySocketAddressWithFamily(initial_addr.family()); 284 285 // Create client 286 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 287 SOCK_STREAM); 288 sink.Monitor(client); 289 290 // Create server 291 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 292 SOCK_STREAM); 293 sink.Monitor(server); 294 EXPECT_EQ(0, server->Bind(initial_addr)); 295 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 296 // Attempt connect to non-listening socket 297 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 298 299 ss_->ProcessMessagesUntilIdle(); 300 301 // No pending server connections 302 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 303 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 304 EXPECT_EQ(accept_addr, nil_addr); 305 306 // Connection failed 307 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 308 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 309 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 310 EXPECT_EQ(client->GetRemoteAddress(), nil_addr); 311 } 312 313 void CloseDuringConnectTest(const SocketAddress& initial_addr) { 314 testing::StreamSink sink; 315 SocketAddress accept_addr; 316 const SocketAddress empty_addr = 317 EmptySocketAddressWithFamily(initial_addr.family()); 318 319 // Create client and server 320 scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(), 321 SOCK_STREAM)); 322 sink.Monitor(client.get()); 323 scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(), 324 SOCK_STREAM)); 325 sink.Monitor(server.get()); 326 327 // Initiate connect 328 EXPECT_EQ(0, server->Bind(initial_addr)); 329 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 330 331 EXPECT_EQ(0, server->Listen(5)); 332 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 333 334 // Server close before socket enters accept queue 335 EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ)); 336 server->Close(); 337 338 ss_->ProcessMessagesUntilIdle(); 339 340 // Result: connection failed 341 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 342 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 343 344 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 345 sink.Monitor(server.get()); 346 347 // Initiate connect 348 EXPECT_EQ(0, server->Bind(initial_addr)); 349 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 350 351 EXPECT_EQ(0, server->Listen(5)); 352 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 353 354 ss_->ProcessMessagesUntilIdle(); 355 356 // Server close while socket is in accept queue 357 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 358 server->Close(); 359 360 ss_->ProcessMessagesUntilIdle(); 361 362 // Result: connection failed 363 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 364 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 365 366 // New server 367 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 368 sink.Monitor(server.get()); 369 370 // Initiate connect 371 EXPECT_EQ(0, server->Bind(initial_addr)); 372 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 373 374 EXPECT_EQ(0, server->Listen(5)); 375 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 376 377 ss_->ProcessMessagesUntilIdle(); 378 379 // Server accepts connection 380 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 381 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr)); 382 ASSERT_TRUE(NULL != accepted.get()); 383 sink.Monitor(accepted.get()); 384 385 // Client closes before connection complets 386 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 387 388 // Connected message has not been processed yet. 389 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 390 client->Close(); 391 392 ss_->ProcessMessagesUntilIdle(); 393 394 // Result: accepted socket closes 395 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED); 396 EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE)); 397 EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE)); 398 } 399 400 void CloseTest(const SocketAddress& initial_addr) { 401 testing::StreamSink sink; 402 const SocketAddress kEmptyAddr; 403 404 // Create clients 405 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 406 sink.Monitor(a); 407 a->Bind(initial_addr); 408 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 409 410 411 scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(), 412 SOCK_STREAM)); 413 sink.Monitor(b.get()); 414 b->Bind(initial_addr); 415 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 416 417 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 418 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 419 420 ss_->ProcessMessagesUntilIdle(); 421 422 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN)); 423 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED); 424 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress()); 425 426 EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN)); 427 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED); 428 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress()); 429 430 EXPECT_EQ(1, a->Send("a", 1)); 431 b->Close(); 432 EXPECT_EQ(1, a->Send("b", 1)); 433 434 ss_->ProcessMessagesUntilIdle(); 435 436 char buffer[10]; 437 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ)); 438 EXPECT_EQ(-1, b->Recv(buffer, 10)); 439 440 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE)); 441 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED); 442 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr); 443 444 // No signal for Closer 445 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE)); 446 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED); 447 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr); 448 } 449 450 void TcpSendTest(const SocketAddress& initial_addr) { 451 testing::StreamSink sink; 452 const SocketAddress kEmptyAddr; 453 454 // Connect two sockets 455 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 456 sink.Monitor(a); 457 a->Bind(initial_addr); 458 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 459 460 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 461 sink.Monitor(b); 462 b->Bind(initial_addr); 463 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 464 465 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 466 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 467 468 ss_->ProcessMessagesUntilIdle(); 469 470 const size_t kBufferSize = 2000; 471 ss_->set_send_buffer_capacity(kBufferSize); 472 ss_->set_recv_buffer_capacity(kBufferSize); 473 474 const size_t kDataSize = 5000; 475 char send_buffer[kDataSize], recv_buffer[kDataSize]; 476 for (size_t i = 0; i < kDataSize; ++i) 477 send_buffer[i] = static_cast<char>(i % 256); 478 memset(recv_buffer, 0, sizeof(recv_buffer)); 479 size_t send_pos = 0, recv_pos = 0; 480 481 // Can't send more than send buffer in one write 482 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 483 EXPECT_EQ(static_cast<int>(kBufferSize), result); 484 send_pos += result; 485 486 ss_->ProcessMessagesUntilIdle(); 487 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 488 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 489 490 // Receive buffer is already filled, fill send buffer again 491 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 492 EXPECT_EQ(static_cast<int>(kBufferSize), result); 493 send_pos += result; 494 495 ss_->ProcessMessagesUntilIdle(); 496 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 497 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 498 499 // No more room in send or receive buffer 500 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 501 EXPECT_EQ(-1, result); 502 EXPECT_TRUE(a->IsBlocking()); 503 504 // Read a subset of the data 505 result = b->Recv(recv_buffer + recv_pos, 500); 506 EXPECT_EQ(500, result); 507 recv_pos += result; 508 509 ss_->ProcessMessagesUntilIdle(); 510 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE)); 511 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 512 513 // Room for more on the sending side 514 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 515 EXPECT_EQ(500, result); 516 send_pos += result; 517 518 // Empty the recv buffer 519 while (true) { 520 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 521 if (result < 0) { 522 EXPECT_EQ(-1, result); 523 EXPECT_TRUE(b->IsBlocking()); 524 break; 525 } 526 recv_pos += result; 527 } 528 529 ss_->ProcessMessagesUntilIdle(); 530 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 531 532 // Continue to empty the recv buffer 533 while (true) { 534 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 535 if (result < 0) { 536 EXPECT_EQ(-1, result); 537 EXPECT_TRUE(b->IsBlocking()); 538 break; 539 } 540 recv_pos += result; 541 } 542 543 // Send last of the data 544 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 545 EXPECT_EQ(500, result); 546 send_pos += result; 547 548 ss_->ProcessMessagesUntilIdle(); 549 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 550 551 // Receive the last of the data 552 while (true) { 553 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 554 if (result < 0) { 555 EXPECT_EQ(-1, result); 556 EXPECT_TRUE(b->IsBlocking()); 557 break; 558 } 559 recv_pos += result; 560 } 561 562 ss_->ProcessMessagesUntilIdle(); 563 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 564 565 // The received data matches the sent data 566 EXPECT_EQ(kDataSize, send_pos); 567 EXPECT_EQ(kDataSize, recv_pos); 568 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize)); 569 } 570 571 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) { 572 const SocketAddress kEmptyAddr; 573 574 // Connect two sockets 575 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), 576 SOCK_STREAM); 577 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), 578 SOCK_STREAM); 579 a->Bind(initial_addr); 580 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 581 582 b->Bind(initial_addr); 583 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 584 585 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 586 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 587 ss_->ProcessMessagesUntilIdle(); 588 589 // First, deliver all packets in 0 ms. 590 char buffer[2] = { 0, 0 }; 591 const char cNumPackets = 10; 592 for (char i = 0; i < cNumPackets; ++i) { 593 buffer[0] = '0' + i; 594 EXPECT_EQ(1, a->Send(buffer, 1)); 595 } 596 597 ss_->ProcessMessagesUntilIdle(); 598 599 for (char i = 0; i < cNumPackets; ++i) { 600 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 601 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]); 602 } 603 604 // Next, deliver packets at random intervals 605 const uint32 mean = 50; 606 const uint32 stddev = 50; 607 608 ss_->set_delay_mean(mean); 609 ss_->set_delay_stddev(stddev); 610 ss_->UpdateDelayDistribution(); 611 612 for (char i = 0; i < cNumPackets; ++i) { 613 buffer[0] = 'A' + i; 614 EXPECT_EQ(1, a->Send(buffer, 1)); 615 } 616 617 ss_->ProcessMessagesUntilIdle(); 618 619 for (char i = 0; i < cNumPackets; ++i) { 620 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 621 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]); 622 } 623 } 624 625 void BandwidthTest(const SocketAddress& initial_addr) { 626 AsyncSocket* send_socket = 627 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 628 AsyncSocket* recv_socket = 629 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 630 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 631 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 632 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 633 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 634 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 635 636 uint32 bandwidth = 64 * 1024; 637 ss_->set_bandwidth(bandwidth); 638 639 Thread* pthMain = Thread::Current(); 640 Sender sender(pthMain, send_socket, 80 * 1024); 641 Receiver receiver(pthMain, recv_socket, bandwidth); 642 643 pthMain->ProcessMessages(5000); 644 sender.done = true; 645 pthMain->ProcessMessages(5000); 646 647 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4); 648 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s 649 650 ss_->set_bandwidth(0); 651 } 652 653 void DelayTest(const SocketAddress& initial_addr) { 654 time_t seed = ::time(NULL); 655 LOG(LS_VERBOSE) << "seed = " << seed; 656 srand(static_cast<unsigned int>(seed)); 657 658 const uint32 mean = 2000; 659 const uint32 stddev = 500; 660 661 ss_->set_delay_mean(mean); 662 ss_->set_delay_stddev(stddev); 663 ss_->UpdateDelayDistribution(); 664 665 AsyncSocket* send_socket = 666 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 667 AsyncSocket* recv_socket = 668 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 669 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 670 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 671 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 672 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 673 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 674 675 Thread* pthMain = Thread::Current(); 676 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about 677 // 1000 packets, which is necessary to get a good distribution. 678 Sender sender(pthMain, send_socket, 100 * 2 * 1024); 679 Receiver receiver(pthMain, recv_socket, 0); 680 681 pthMain->ProcessMessages(10000); 682 sender.done = receiver.done = true; 683 ss_->ProcessMessagesUntilIdle(); 684 685 const double sample_mean = receiver.sum / receiver.samples; 686 double num = 687 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum; 688 double den = receiver.samples * (receiver.samples - 1); 689 const double sample_stddev = sqrt(num / den); 690 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev; 691 692 EXPECT_LE(500u, receiver.samples); 693 // We initially used a 0.1 fudge factor, but on the build machine, we 694 // have seen the value differ by as much as 0.13. 695 EXPECT_NEAR(mean, sample_mean, 0.15 * mean); 696 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev); 697 698 ss_->set_delay_mean(0); 699 ss_->set_delay_stddev(0); 700 ss_->UpdateDelayDistribution(); 701 } 702 703 // Test cross-family communication between a client bound to client_addr and a 704 // server bound to server_addr. shouldSucceed indicates if communication is 705 // expected to work or not. 706 void CrossFamilyConnectionTest(const SocketAddress& client_addr, 707 const SocketAddress& server_addr, 708 bool shouldSucceed) { 709 testing::StreamSink sink; 710 SocketAddress accept_address; 711 const SocketAddress kEmptyAddr; 712 713 // Client gets a IPv4 address 714 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(), 715 SOCK_STREAM); 716 sink.Monitor(client); 717 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 718 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr); 719 client->Bind(client_addr); 720 721 // Server gets a non-mapped non-any IPv6 address. 722 // IPv4 sockets should not be able to connect to this. 723 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(), 724 SOCK_STREAM); 725 sink.Monitor(server); 726 server->Bind(server_addr); 727 server->Listen(5); 728 729 if (shouldSucceed) { 730 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 731 ss_->ProcessMessagesUntilIdle(); 732 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 733 Socket* accepted = server->Accept(&accept_address); 734 EXPECT_TRUE(NULL != accepted); 735 EXPECT_NE(kEmptyAddr, accept_address); 736 ss_->ProcessMessagesUntilIdle(); 737 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 738 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 739 } else { 740 // Check that the connection failed. 741 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress())); 742 ss_->ProcessMessagesUntilIdle(); 743 744 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 745 EXPECT_TRUE(NULL == server->Accept(&accept_address)); 746 EXPECT_EQ(accept_address, kEmptyAddr); 747 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 748 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 749 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr); 750 } 751 } 752 753 // Test cross-family datagram sending between a client bound to client_addr 754 // and a server bound to server_addr. shouldSucceed indicates if sending is 755 // expected to succed or not. 756 void CrossFamilyDatagramTest(const SocketAddress& client_addr, 757 const SocketAddress& server_addr, 758 bool shouldSucceed) { 759 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM); 760 socket->Bind(server_addr); 761 SocketAddress bound_server_addr = socket->GetLocalAddress(); 762 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 763 764 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM); 765 socket2->Bind(client_addr); 766 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 767 SocketAddress client2_addr; 768 769 if (shouldSucceed) { 770 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr)); 771 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 772 SocketAddress client1_addr; 773 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 774 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 775 EXPECT_EQ(client1_addr, bound_server_addr); 776 } else { 777 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr)); 778 EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0)); 779 } 780 } 781 782 protected: 783 virtual void SetUp() { 784 Thread::Current()->set_socketserver(ss_); 785 } 786 virtual void TearDown() { 787 Thread::Current()->set_socketserver(NULL); 788 } 789 790 VirtualSocketServer* ss_; 791 const SocketAddress kIPv4AnyAddress; 792 const SocketAddress kIPv6AnyAddress; 793 }; 794 795 TEST_F(VirtualSocketServerTest, basic_v4) { 796 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000); 797 BasicTest(ipv4_test_addr); 798 } 799 800 TEST_F(VirtualSocketServerTest, basic_v6) { 801 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000); 802 BasicTest(ipv6_test_addr); 803 } 804 805 TEST_F(VirtualSocketServerTest, connect_v4) { 806 ConnectTest(kIPv4AnyAddress); 807 } 808 809 TEST_F(VirtualSocketServerTest, connect_v6) { 810 ConnectTest(kIPv6AnyAddress); 811 } 812 813 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) { 814 ConnectToNonListenerTest(kIPv4AnyAddress); 815 } 816 817 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) { 818 ConnectToNonListenerTest(kIPv6AnyAddress); 819 } 820 821 TEST_F(VirtualSocketServerTest, close_during_connect_v4) { 822 CloseDuringConnectTest(kIPv4AnyAddress); 823 } 824 825 TEST_F(VirtualSocketServerTest, close_during_connect_v6) { 826 CloseDuringConnectTest(kIPv6AnyAddress); 827 } 828 829 TEST_F(VirtualSocketServerTest, close_v4) { 830 CloseTest(kIPv4AnyAddress); 831 } 832 833 TEST_F(VirtualSocketServerTest, close_v6) { 834 CloseTest(kIPv6AnyAddress); 835 } 836 837 TEST_F(VirtualSocketServerTest, tcp_send_v4) { 838 TcpSendTest(kIPv4AnyAddress); 839 } 840 841 TEST_F(VirtualSocketServerTest, tcp_send_v6) { 842 TcpSendTest(kIPv6AnyAddress); 843 } 844 845 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) { 846 TcpSendsPacketsInOrderTest(kIPv4AnyAddress); 847 } 848 849 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) { 850 TcpSendsPacketsInOrderTest(kIPv6AnyAddress); 851 } 852 853 TEST_F(VirtualSocketServerTest, bandwidth_v4) { 854 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 855 BandwidthTest(ipv4_test_addr); 856 } 857 858 TEST_F(VirtualSocketServerTest, bandwidth_v6) { 859 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 860 BandwidthTest(ipv6_test_addr); 861 } 862 863 TEST_F(VirtualSocketServerTest, delay_v4) { 864 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000); 865 DelayTest(ipv4_test_addr); 866 } 867 868 // See: https://code.google.com/p/webrtc/issues/detail?id=2409 869 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) { 870 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000); 871 DelayTest(ipv6_test_addr); 872 } 873 874 // Works, receiving socket sees 127.0.0.2. 875 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) { 876 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0), 877 SocketAddress("0.0.0.0", 5000), 878 true); 879 } 880 881 // Fails. 882 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) { 883 CrossFamilyConnectionTest(SocketAddress("::2", 0), 884 SocketAddress("0.0.0.0", 5000), 885 false); 886 } 887 888 // Fails. 889 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) { 890 CrossFamilyConnectionTest(SocketAddress("::2", 0), 891 SocketAddress("::ffff:127.0.0.1", 5000), 892 false); 893 } 894 895 // Works. receiving socket sees ::ffff:127.0.0.2. 896 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) { 897 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 898 SocketAddress("::", 5000), 899 true); 900 } 901 902 // Fails. 903 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) { 904 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 905 SocketAddress("::1", 5000), 906 false); 907 } 908 909 // Works. Receiving socket sees ::ffff:127.0.0.1. 910 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) { 911 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0), 912 SocketAddress("::ffff:127.0.0.2", 5000), 913 true); 914 } 915 916 // Works, receiving socket sees a result from GetNextIP. 917 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) { 918 CrossFamilyConnectionTest(SocketAddress("::", 0), 919 SocketAddress("0.0.0.0", 5000), 920 true); 921 } 922 923 // Works, receiving socket sees whatever GetNextIP gave the client. 924 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) { 925 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0), 926 SocketAddress("::", 5000), 927 true); 928 } 929 930 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) { 931 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0), 932 SocketAddress("::", 5000), 933 true); 934 } 935 936 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) { 937 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0), 938 SocketAddress("0.0.0.0", 5000), 939 true); 940 } 941 942 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) { 943 CrossFamilyDatagramTest(SocketAddress("::2", 0), 944 SocketAddress("0.0.0.0", 5000), 945 false); 946 } 947 948 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) { 949 CrossFamilyDatagramTest(SocketAddress("::2", 0), 950 SocketAddress("::ffff:127.0.0.1", 5000), 951 false); 952 } 953 954 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) { 955 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 956 SocketAddress("::", 5000), 957 true); 958 } 959 960 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) { 961 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 962 SocketAddress("::1", 5000), 963 false); 964 } 965 966 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) { 967 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0), 968 SocketAddress("::ffff:127.0.0.2", 5000), 969 true); 970 } 971 972 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) { 973 CrossFamilyDatagramTest(SocketAddress("::", 0), 974 SocketAddress("0.0.0.0", 5000), 975 true); 976 } 977 978 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) { 979 const uint32 kTestMean[] = { 10, 100, 333, 1000 }; 980 const double kTestDev[] = { 0.25, 0.1, 0.01 }; 981 // TODO: The current code only works for 1000 data points or more. 982 const uint32 kTestSamples[] = { /*10, 100,*/ 1000 }; 983 for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) { 984 for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) { 985 for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) { 986 ASSERT_LT(0u, kTestSamples[sidx]); 987 const uint32 kStdDev = 988 static_cast<uint32>(kTestDev[didx] * kTestMean[midx]); 989 VirtualSocketServer::Function* f = 990 VirtualSocketServer::CreateDistribution(kTestMean[midx], 991 kStdDev, 992 kTestSamples[sidx]); 993 ASSERT_TRUE(NULL != f); 994 ASSERT_EQ(kTestSamples[sidx], f->size()); 995 double sum = 0; 996 for (uint32 i = 0; i < f->size(); ++i) { 997 sum += (*f)[i].second; 998 } 999 const double mean = sum / f->size(); 1000 double sum_sq_dev = 0; 1001 for (uint32 i = 0; i < f->size(); ++i) { 1002 double dev = (*f)[i].second - mean; 1003 sum_sq_dev += dev * dev; 1004 } 1005 const double stddev = sqrt(sum_sq_dev / f->size()); 1006 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx]) 1007 << "M=" << kTestMean[midx] 1008 << " SD=" << kStdDev 1009 << " N=" << kTestSamples[sidx]; 1010 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev) 1011 << "M=" << kTestMean[midx] 1012 << " SD=" << kStdDev 1013 << " N=" << kTestSamples[sidx]; 1014 delete f; 1015 } 1016 } 1017 } 1018 } 1019