1 /* 2 * Copyright 2006 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include <math.h> 12 #include <time.h> 13 #if defined(WEBRTC_POSIX) 14 #include <netinet/in.h> 15 #endif 16 17 #include "webrtc/base/arraysize.h" 18 #include "webrtc/base/logging.h" 19 #include "webrtc/base/gunit.h" 20 #include "webrtc/base/testclient.h" 21 #include "webrtc/base/testutils.h" 22 #include "webrtc/base/thread.h" 23 #include "webrtc/base/timeutils.h" 24 #include "webrtc/base/virtualsocketserver.h" 25 26 using namespace rtc; 27 28 // Sends at a constant rate but with random packet sizes. 29 struct Sender : public MessageHandler { 30 Sender(Thread* th, AsyncSocket* s, uint32_t rt) 31 : thread(th), 32 socket(new AsyncUDPSocket(s)), 33 done(false), 34 rate(rt), 35 count(0) { 36 last_send = rtc::Time(); 37 thread->PostDelayed(NextDelay(), this, 1); 38 } 39 40 uint32_t NextDelay() { 41 uint32_t size = (rand() % 4096) + 1; 42 return 1000 * size / rate; 43 } 44 45 void OnMessage(Message* pmsg) { 46 ASSERT_EQ(1u, pmsg->message_id); 47 48 if (done) 49 return; 50 51 uint32_t cur_time = rtc::Time(); 52 uint32_t delay = cur_time - last_send; 53 uint32_t size = rate * delay / 1000; 54 size = std::min<uint32_t>(size, 4096); 55 size = std::max<uint32_t>(size, sizeof(uint32_t)); 56 57 count += size; 58 memcpy(dummy, &cur_time, sizeof(cur_time)); 59 socket->Send(dummy, size, options); 60 61 last_send = cur_time; 62 thread->PostDelayed(NextDelay(), this, 1); 63 } 64 65 Thread* thread; 66 scoped_ptr<AsyncUDPSocket> socket; 67 rtc::PacketOptions options; 68 bool done; 69 uint32_t rate; // bytes per second 70 uint32_t count; 71 uint32_t last_send; 72 char dummy[4096]; 73 }; 74 75 struct Receiver : public MessageHandler, public sigslot::has_slots<> { 76 Receiver(Thread* th, AsyncSocket* s, uint32_t bw) 77 : thread(th), 78 socket(new AsyncUDPSocket(s)), 79 bandwidth(bw), 80 done(false), 81 count(0), 82 sec_count(0), 83 sum(0), 84 sum_sq(0), 85 samples(0) { 86 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket); 87 thread->PostDelayed(1000, this, 1); 88 } 89 90 ~Receiver() { 91 thread->Clear(this); 92 } 93 94 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size, 95 const SocketAddress& remote_addr, 96 const PacketTime& packet_time) { 97 ASSERT_EQ(socket.get(), s); 98 ASSERT_GE(size, 4U); 99 100 count += size; 101 sec_count += size; 102 103 uint32_t send_time = *reinterpret_cast<const uint32_t*>(data); 104 uint32_t recv_time = rtc::Time(); 105 uint32_t delay = recv_time - send_time; 106 sum += delay; 107 sum_sq += delay * delay; 108 samples += 1; 109 } 110 111 void OnMessage(Message* pmsg) { 112 ASSERT_EQ(1u, pmsg->message_id); 113 114 if (done) 115 return; 116 117 // It is always possible for us to receive more than expected because 118 // packets can be further delayed in delivery. 119 if (bandwidth > 0) 120 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4); 121 sec_count = 0; 122 thread->PostDelayed(1000, this, 1); 123 } 124 125 Thread* thread; 126 scoped_ptr<AsyncUDPSocket> socket; 127 uint32_t bandwidth; 128 bool done; 129 size_t count; 130 size_t sec_count; 131 double sum; 132 double sum_sq; 133 uint32_t samples; 134 }; 135 136 class VirtualSocketServerTest : public testing::Test { 137 public: 138 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)), 139 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0), 140 kIPv6AnyAddress(IPAddress(in6addr_any), 0) { 141 } 142 143 void CheckPortIncrementalization(const SocketAddress& post, 144 const SocketAddress& pre) { 145 EXPECT_EQ(post.port(), pre.port() + 1); 146 IPAddress post_ip = post.ipaddr(); 147 IPAddress pre_ip = pre.ipaddr(); 148 EXPECT_EQ(pre_ip.family(), post_ip.family()); 149 if (post_ip.family() == AF_INET) { 150 in_addr pre_ipv4 = pre_ip.ipv4_address(); 151 in_addr post_ipv4 = post_ip.ipv4_address(); 152 EXPECT_EQ(post_ipv4.s_addr, pre_ipv4.s_addr); 153 } else if (post_ip.family() == AF_INET6) { 154 in6_addr post_ip6 = post_ip.ipv6_address(); 155 in6_addr pre_ip6 = pre_ip.ipv6_address(); 156 uint32_t* post_as_ints = reinterpret_cast<uint32_t*>(&post_ip6.s6_addr); 157 uint32_t* pre_as_ints = reinterpret_cast<uint32_t*>(&pre_ip6.s6_addr); 158 EXPECT_EQ(post_as_ints[3], pre_as_ints[3]); 159 } 160 } 161 162 // Test a client can bind to the any address, and all sent packets will have 163 // the default route as the source address. Also, it can receive packets sent 164 // to the default route. 165 void TestDefaultRoute(const IPAddress& default_route) { 166 ss_->SetDefaultRoute(default_route); 167 168 // Create client1 bound to the any address. 169 AsyncSocket* socket = 170 ss_->CreateAsyncSocket(default_route.family(), SOCK_DGRAM); 171 socket->Bind(EmptySocketAddressWithFamily(default_route.family())); 172 SocketAddress client1_any_addr = socket->GetLocalAddress(); 173 EXPECT_TRUE(client1_any_addr.IsAnyIP()); 174 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 175 176 // Create client2 bound to the default route. 177 AsyncSocket* socket2 = 178 ss_->CreateAsyncSocket(default_route.family(), SOCK_DGRAM); 179 socket2->Bind(SocketAddress(default_route, 0)); 180 SocketAddress client2_addr = socket2->GetLocalAddress(); 181 EXPECT_FALSE(client2_addr.IsAnyIP()); 182 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 183 184 // Client1 sends to client2, client2 should see the default route as 185 // client1's address. 186 SocketAddress client1_addr; 187 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 188 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 189 EXPECT_EQ(client1_addr, 190 SocketAddress(default_route, client1_any_addr.port())); 191 192 // Client2 can send back to client1's default route address. 193 EXPECT_EQ(3, client2->SendTo("foo", 3, client1_addr)); 194 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 195 } 196 197 void BasicTest(const SocketAddress& initial_addr) { 198 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(), 199 SOCK_DGRAM); 200 socket->Bind(initial_addr); 201 SocketAddress server_addr = socket->GetLocalAddress(); 202 // Make sure VSS didn't switch families on us. 203 EXPECT_EQ(server_addr.family(), initial_addr.family()); 204 205 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 206 AsyncSocket* socket2 = 207 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 208 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 209 210 SocketAddress client2_addr; 211 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 212 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 213 214 SocketAddress client1_addr; 215 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 216 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 217 EXPECT_EQ(client1_addr, server_addr); 218 219 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family()); 220 for (int i = 0; i < 10; i++) { 221 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty)); 222 223 SocketAddress next_client2_addr; 224 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr)); 225 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr)); 226 CheckPortIncrementalization(next_client2_addr, client2_addr); 227 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1); 228 229 SocketAddress server_addr2; 230 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr)); 231 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2)); 232 EXPECT_EQ(server_addr2, server_addr); 233 234 client2_addr = next_client2_addr; 235 } 236 } 237 238 // initial_addr should be made from either INADDR_ANY or in6addr_any. 239 void ConnectTest(const SocketAddress& initial_addr) { 240 testing::StreamSink sink; 241 SocketAddress accept_addr; 242 const SocketAddress kEmptyAddr = 243 EmptySocketAddressWithFamily(initial_addr.family()); 244 245 // Create client 246 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 247 SOCK_STREAM); 248 sink.Monitor(client); 249 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 250 EXPECT_TRUE(client->GetLocalAddress().IsNil()); 251 252 // Create server 253 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 254 SOCK_STREAM); 255 sink.Monitor(server); 256 EXPECT_NE(0, server->Listen(5)); // Bind required 257 EXPECT_EQ(0, server->Bind(initial_addr)); 258 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 259 EXPECT_EQ(0, server->Listen(5)); 260 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING); 261 262 // No pending server connections 263 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 264 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 265 EXPECT_EQ(AF_UNSPEC, accept_addr.family()); 266 267 // Attempt connect to listening socket 268 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 269 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind 270 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind 271 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress()); 272 273 // Client is connecting 274 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 275 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 276 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 277 278 ss_->ProcessMessagesUntilIdle(); 279 280 // Client still connecting 281 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 282 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 283 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 284 285 // Server has pending connection 286 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 287 Socket* accepted = server->Accept(&accept_addr); 288 EXPECT_TRUE(NULL != accepted); 289 EXPECT_NE(accept_addr, kEmptyAddr); 290 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr); 291 292 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 293 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress()); 294 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 295 296 ss_->ProcessMessagesUntilIdle(); 297 298 // Client has connected 299 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED); 300 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 301 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE)); 302 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 303 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 304 } 305 306 void ConnectToNonListenerTest(const SocketAddress& initial_addr) { 307 testing::StreamSink sink; 308 SocketAddress accept_addr; 309 const SocketAddress nil_addr; 310 const SocketAddress empty_addr = 311 EmptySocketAddressWithFamily(initial_addr.family()); 312 313 // Create client 314 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(), 315 SOCK_STREAM); 316 sink.Monitor(client); 317 318 // Create server 319 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(), 320 SOCK_STREAM); 321 sink.Monitor(server); 322 EXPECT_EQ(0, server->Bind(initial_addr)); 323 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 324 // Attempt connect to non-listening socket 325 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 326 327 ss_->ProcessMessagesUntilIdle(); 328 329 // No pending server connections 330 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 331 EXPECT_TRUE(NULL == server->Accept(&accept_addr)); 332 EXPECT_EQ(accept_addr, nil_addr); 333 334 // Connection failed 335 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 336 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 337 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR)); 338 EXPECT_EQ(client->GetRemoteAddress(), nil_addr); 339 } 340 341 void CloseDuringConnectTest(const SocketAddress& initial_addr) { 342 testing::StreamSink sink; 343 SocketAddress accept_addr; 344 const SocketAddress empty_addr = 345 EmptySocketAddressWithFamily(initial_addr.family()); 346 347 // Create client and server 348 scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(), 349 SOCK_STREAM)); 350 sink.Monitor(client.get()); 351 scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(), 352 SOCK_STREAM)); 353 sink.Monitor(server.get()); 354 355 // Initiate connect 356 EXPECT_EQ(0, server->Bind(initial_addr)); 357 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 358 359 EXPECT_EQ(0, server->Listen(5)); 360 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 361 362 // Server close before socket enters accept queue 363 EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ)); 364 server->Close(); 365 366 ss_->ProcessMessagesUntilIdle(); 367 368 // Result: connection failed 369 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 370 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 371 372 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 373 sink.Monitor(server.get()); 374 375 // Initiate connect 376 EXPECT_EQ(0, server->Bind(initial_addr)); 377 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 378 379 EXPECT_EQ(0, server->Listen(5)); 380 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 381 382 ss_->ProcessMessagesUntilIdle(); 383 384 // Server close while socket is in accept queue 385 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 386 server->Close(); 387 388 ss_->ProcessMessagesUntilIdle(); 389 390 // Result: connection failed 391 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 392 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR)); 393 394 // New server 395 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM)); 396 sink.Monitor(server.get()); 397 398 // Initiate connect 399 EXPECT_EQ(0, server->Bind(initial_addr)); 400 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family()); 401 402 EXPECT_EQ(0, server->Listen(5)); 403 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 404 405 ss_->ProcessMessagesUntilIdle(); 406 407 // Server accepts connection 408 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ)); 409 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr)); 410 ASSERT_TRUE(NULL != accepted.get()); 411 sink.Monitor(accepted.get()); 412 413 // Client closes before connection complets 414 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED); 415 416 // Connected message has not been processed yet. 417 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING); 418 client->Close(); 419 420 ss_->ProcessMessagesUntilIdle(); 421 422 // Result: accepted socket closes 423 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED); 424 EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE)); 425 EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE)); 426 } 427 428 void CloseTest(const SocketAddress& initial_addr) { 429 testing::StreamSink sink; 430 const SocketAddress kEmptyAddr; 431 432 // Create clients 433 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 434 sink.Monitor(a); 435 a->Bind(initial_addr); 436 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 437 438 439 scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(), 440 SOCK_STREAM)); 441 sink.Monitor(b.get()); 442 b->Bind(initial_addr); 443 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 444 445 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 446 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 447 448 ss_->ProcessMessagesUntilIdle(); 449 450 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN)); 451 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED); 452 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress()); 453 454 EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN)); 455 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED); 456 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress()); 457 458 EXPECT_EQ(1, a->Send("a", 1)); 459 b->Close(); 460 EXPECT_EQ(1, a->Send("b", 1)); 461 462 ss_->ProcessMessagesUntilIdle(); 463 464 char buffer[10]; 465 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ)); 466 EXPECT_EQ(-1, b->Recv(buffer, 10)); 467 468 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE)); 469 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED); 470 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr); 471 472 // No signal for Closer 473 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE)); 474 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED); 475 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr); 476 } 477 478 void TcpSendTest(const SocketAddress& initial_addr) { 479 testing::StreamSink sink; 480 const SocketAddress kEmptyAddr; 481 482 // Connect two sockets 483 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 484 sink.Monitor(a); 485 a->Bind(initial_addr); 486 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 487 488 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM); 489 sink.Monitor(b); 490 b->Bind(initial_addr); 491 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 492 493 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 494 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 495 496 ss_->ProcessMessagesUntilIdle(); 497 498 const size_t kBufferSize = 2000; 499 ss_->set_send_buffer_capacity(kBufferSize); 500 ss_->set_recv_buffer_capacity(kBufferSize); 501 502 const size_t kDataSize = 5000; 503 char send_buffer[kDataSize], recv_buffer[kDataSize]; 504 for (size_t i = 0; i < kDataSize; ++i) 505 send_buffer[i] = static_cast<char>(i % 256); 506 memset(recv_buffer, 0, sizeof(recv_buffer)); 507 size_t send_pos = 0, recv_pos = 0; 508 509 // Can't send more than send buffer in one write 510 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 511 EXPECT_EQ(static_cast<int>(kBufferSize), result); 512 send_pos += result; 513 514 ss_->ProcessMessagesUntilIdle(); 515 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 516 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 517 518 // Receive buffer is already filled, fill send buffer again 519 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 520 EXPECT_EQ(static_cast<int>(kBufferSize), result); 521 send_pos += result; 522 523 ss_->ProcessMessagesUntilIdle(); 524 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE)); 525 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 526 527 // No more room in send or receive buffer 528 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 529 EXPECT_EQ(-1, result); 530 EXPECT_TRUE(a->IsBlocking()); 531 532 // Read a subset of the data 533 result = b->Recv(recv_buffer + recv_pos, 500); 534 EXPECT_EQ(500, result); 535 recv_pos += result; 536 537 ss_->ProcessMessagesUntilIdle(); 538 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE)); 539 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 540 541 // Room for more on the sending side 542 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 543 EXPECT_EQ(500, result); 544 send_pos += result; 545 546 // Empty the recv buffer 547 while (true) { 548 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 549 if (result < 0) { 550 EXPECT_EQ(-1, result); 551 EXPECT_TRUE(b->IsBlocking()); 552 break; 553 } 554 recv_pos += result; 555 } 556 557 ss_->ProcessMessagesUntilIdle(); 558 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 559 560 // Continue to empty the recv buffer 561 while (true) { 562 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 563 if (result < 0) { 564 EXPECT_EQ(-1, result); 565 EXPECT_TRUE(b->IsBlocking()); 566 break; 567 } 568 recv_pos += result; 569 } 570 571 // Send last of the data 572 result = a->Send(send_buffer + send_pos, kDataSize - send_pos); 573 EXPECT_EQ(500, result); 574 send_pos += result; 575 576 ss_->ProcessMessagesUntilIdle(); 577 EXPECT_TRUE(sink.Check(b, testing::SSE_READ)); 578 579 // Receive the last of the data 580 while (true) { 581 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos); 582 if (result < 0) { 583 EXPECT_EQ(-1, result); 584 EXPECT_TRUE(b->IsBlocking()); 585 break; 586 } 587 recv_pos += result; 588 } 589 590 ss_->ProcessMessagesUntilIdle(); 591 EXPECT_FALSE(sink.Check(b, testing::SSE_READ)); 592 593 // The received data matches the sent data 594 EXPECT_EQ(kDataSize, send_pos); 595 EXPECT_EQ(kDataSize, recv_pos); 596 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize)); 597 } 598 599 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) { 600 const SocketAddress kEmptyAddr; 601 602 // Connect two sockets 603 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), 604 SOCK_STREAM); 605 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), 606 SOCK_STREAM); 607 a->Bind(initial_addr); 608 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family()); 609 610 b->Bind(initial_addr); 611 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family()); 612 613 EXPECT_EQ(0, a->Connect(b->GetLocalAddress())); 614 EXPECT_EQ(0, b->Connect(a->GetLocalAddress())); 615 ss_->ProcessMessagesUntilIdle(); 616 617 // First, deliver all packets in 0 ms. 618 char buffer[2] = { 0, 0 }; 619 const char cNumPackets = 10; 620 for (char i = 0; i < cNumPackets; ++i) { 621 buffer[0] = '0' + i; 622 EXPECT_EQ(1, a->Send(buffer, 1)); 623 } 624 625 ss_->ProcessMessagesUntilIdle(); 626 627 for (char i = 0; i < cNumPackets; ++i) { 628 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 629 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]); 630 } 631 632 // Next, deliver packets at random intervals 633 const uint32_t mean = 50; 634 const uint32_t stddev = 50; 635 636 ss_->set_delay_mean(mean); 637 ss_->set_delay_stddev(stddev); 638 ss_->UpdateDelayDistribution(); 639 640 for (char i = 0; i < cNumPackets; ++i) { 641 buffer[0] = 'A' + i; 642 EXPECT_EQ(1, a->Send(buffer, 1)); 643 } 644 645 ss_->ProcessMessagesUntilIdle(); 646 647 for (char i = 0; i < cNumPackets; ++i) { 648 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer))); 649 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]); 650 } 651 } 652 653 // It is important that initial_addr's port has to be 0 such that the 654 // incremental port behavior could ensure the 2 Binds result in different 655 // address. 656 void BandwidthTest(const SocketAddress& initial_addr) { 657 AsyncSocket* send_socket = 658 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 659 AsyncSocket* recv_socket = 660 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 661 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 662 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 663 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 664 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 665 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 666 667 uint32_t bandwidth = 64 * 1024; 668 ss_->set_bandwidth(bandwidth); 669 670 Thread* pthMain = Thread::Current(); 671 Sender sender(pthMain, send_socket, 80 * 1024); 672 Receiver receiver(pthMain, recv_socket, bandwidth); 673 674 pthMain->ProcessMessages(5000); 675 sender.done = true; 676 pthMain->ProcessMessages(5000); 677 678 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4); 679 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s 680 681 ss_->set_bandwidth(0); 682 } 683 684 // It is important that initial_addr's port has to be 0 such that the 685 // incremental port behavior could ensure the 2 Binds result in different 686 // address. 687 void DelayTest(const SocketAddress& initial_addr) { 688 time_t seed = ::time(NULL); 689 LOG(LS_VERBOSE) << "seed = " << seed; 690 srand(static_cast<unsigned int>(seed)); 691 692 const uint32_t mean = 2000; 693 const uint32_t stddev = 500; 694 695 ss_->set_delay_mean(mean); 696 ss_->set_delay_stddev(stddev); 697 ss_->UpdateDelayDistribution(); 698 699 AsyncSocket* send_socket = 700 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 701 AsyncSocket* recv_socket = 702 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM); 703 ASSERT_EQ(0, send_socket->Bind(initial_addr)); 704 ASSERT_EQ(0, recv_socket->Bind(initial_addr)); 705 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family()); 706 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family()); 707 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress())); 708 709 Thread* pthMain = Thread::Current(); 710 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about 711 // 1000 packets, which is necessary to get a good distribution. 712 Sender sender(pthMain, send_socket, 100 * 2 * 1024); 713 Receiver receiver(pthMain, recv_socket, 0); 714 715 pthMain->ProcessMessages(10000); 716 sender.done = receiver.done = true; 717 ss_->ProcessMessagesUntilIdle(); 718 719 const double sample_mean = receiver.sum / receiver.samples; 720 double num = 721 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum; 722 double den = receiver.samples * (receiver.samples - 1); 723 const double sample_stddev = sqrt(num / den); 724 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev; 725 726 EXPECT_LE(500u, receiver.samples); 727 // We initially used a 0.1 fudge factor, but on the build machine, we 728 // have seen the value differ by as much as 0.13. 729 EXPECT_NEAR(mean, sample_mean, 0.15 * mean); 730 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev); 731 732 ss_->set_delay_mean(0); 733 ss_->set_delay_stddev(0); 734 ss_->UpdateDelayDistribution(); 735 } 736 737 // Test cross-family communication between a client bound to client_addr and a 738 // server bound to server_addr. shouldSucceed indicates if communication is 739 // expected to work or not. 740 void CrossFamilyConnectionTest(const SocketAddress& client_addr, 741 const SocketAddress& server_addr, 742 bool shouldSucceed) { 743 testing::StreamSink sink; 744 SocketAddress accept_address; 745 const SocketAddress kEmptyAddr; 746 747 // Client gets a IPv4 address 748 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(), 749 SOCK_STREAM); 750 sink.Monitor(client); 751 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 752 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr); 753 client->Bind(client_addr); 754 755 // Server gets a non-mapped non-any IPv6 address. 756 // IPv4 sockets should not be able to connect to this. 757 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(), 758 SOCK_STREAM); 759 sink.Monitor(server); 760 server->Bind(server_addr); 761 server->Listen(5); 762 763 if (shouldSucceed) { 764 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 765 ss_->ProcessMessagesUntilIdle(); 766 EXPECT_TRUE(sink.Check(server, testing::SSE_READ)); 767 Socket* accepted = server->Accept(&accept_address); 768 EXPECT_TRUE(NULL != accepted); 769 EXPECT_NE(kEmptyAddr, accept_address); 770 ss_->ProcessMessagesUntilIdle(); 771 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN)); 772 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress()); 773 } else { 774 // Check that the connection failed. 775 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress())); 776 ss_->ProcessMessagesUntilIdle(); 777 778 EXPECT_FALSE(sink.Check(server, testing::SSE_READ)); 779 EXPECT_TRUE(NULL == server->Accept(&accept_address)); 780 EXPECT_EQ(accept_address, kEmptyAddr); 781 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED); 782 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN)); 783 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr); 784 } 785 } 786 787 // Test cross-family datagram sending between a client bound to client_addr 788 // and a server bound to server_addr. shouldSucceed indicates if sending is 789 // expected to succeed or not. 790 void CrossFamilyDatagramTest(const SocketAddress& client_addr, 791 const SocketAddress& server_addr, 792 bool shouldSucceed) { 793 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM); 794 socket->Bind(server_addr); 795 SocketAddress bound_server_addr = socket->GetLocalAddress(); 796 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket)); 797 798 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM); 799 socket2->Bind(client_addr); 800 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2)); 801 SocketAddress client2_addr; 802 803 if (shouldSucceed) { 804 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr)); 805 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr)); 806 SocketAddress client1_addr; 807 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr)); 808 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr)); 809 EXPECT_EQ(client1_addr, bound_server_addr); 810 } else { 811 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr)); 812 EXPECT_TRUE(client1->CheckNoPacket()); 813 } 814 } 815 816 protected: 817 virtual void SetUp() { 818 Thread::Current()->set_socketserver(ss_); 819 } 820 virtual void TearDown() { 821 Thread::Current()->set_socketserver(NULL); 822 } 823 824 VirtualSocketServer* ss_; 825 const SocketAddress kIPv4AnyAddress; 826 const SocketAddress kIPv6AnyAddress; 827 }; 828 829 TEST_F(VirtualSocketServerTest, basic_v4) { 830 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000); 831 BasicTest(ipv4_test_addr); 832 } 833 834 TEST_F(VirtualSocketServerTest, basic_v6) { 835 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000); 836 BasicTest(ipv6_test_addr); 837 } 838 839 TEST_F(VirtualSocketServerTest, TestDefaultRoute_v4) { 840 IPAddress ipv4_default_addr(0x01020304); 841 TestDefaultRoute(ipv4_default_addr); 842 } 843 844 TEST_F(VirtualSocketServerTest, TestDefaultRoute_v6) { 845 IPAddress ipv6_default_addr; 846 EXPECT_TRUE( 847 IPFromString("2401:fa00:4:1000:be30:5bff:fee5:c3", &ipv6_default_addr)); 848 TestDefaultRoute(ipv6_default_addr); 849 } 850 851 TEST_F(VirtualSocketServerTest, connect_v4) { 852 ConnectTest(kIPv4AnyAddress); 853 } 854 855 TEST_F(VirtualSocketServerTest, connect_v6) { 856 ConnectTest(kIPv6AnyAddress); 857 } 858 859 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) { 860 ConnectToNonListenerTest(kIPv4AnyAddress); 861 } 862 863 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) { 864 ConnectToNonListenerTest(kIPv6AnyAddress); 865 } 866 867 TEST_F(VirtualSocketServerTest, close_during_connect_v4) { 868 CloseDuringConnectTest(kIPv4AnyAddress); 869 } 870 871 TEST_F(VirtualSocketServerTest, close_during_connect_v6) { 872 CloseDuringConnectTest(kIPv6AnyAddress); 873 } 874 875 TEST_F(VirtualSocketServerTest, close_v4) { 876 CloseTest(kIPv4AnyAddress); 877 } 878 879 TEST_F(VirtualSocketServerTest, close_v6) { 880 CloseTest(kIPv6AnyAddress); 881 } 882 883 TEST_F(VirtualSocketServerTest, tcp_send_v4) { 884 TcpSendTest(kIPv4AnyAddress); 885 } 886 887 TEST_F(VirtualSocketServerTest, tcp_send_v6) { 888 TcpSendTest(kIPv6AnyAddress); 889 } 890 891 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) { 892 TcpSendsPacketsInOrderTest(kIPv4AnyAddress); 893 } 894 895 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) { 896 TcpSendsPacketsInOrderTest(kIPv6AnyAddress); 897 } 898 899 TEST_F(VirtualSocketServerTest, bandwidth_v4) { 900 BandwidthTest(kIPv4AnyAddress); 901 } 902 903 TEST_F(VirtualSocketServerTest, bandwidth_v6) { 904 BandwidthTest(kIPv6AnyAddress); 905 } 906 907 TEST_F(VirtualSocketServerTest, delay_v4) { 908 DelayTest(kIPv4AnyAddress); 909 } 910 911 // See: https://code.google.com/p/webrtc/issues/detail?id=2409 912 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) { 913 DelayTest(kIPv6AnyAddress); 914 } 915 916 // Works, receiving socket sees 127.0.0.2. 917 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) { 918 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0), 919 SocketAddress("0.0.0.0", 5000), 920 true); 921 } 922 923 // Fails. 924 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) { 925 CrossFamilyConnectionTest(SocketAddress("::2", 0), 926 SocketAddress("0.0.0.0", 5000), 927 false); 928 } 929 930 // Fails. 931 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) { 932 CrossFamilyConnectionTest(SocketAddress("::2", 0), 933 SocketAddress("::ffff:127.0.0.1", 5000), 934 false); 935 } 936 937 // Works. receiving socket sees ::ffff:127.0.0.2. 938 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) { 939 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 940 SocketAddress("::", 5000), 941 true); 942 } 943 944 // Fails. 945 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) { 946 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0), 947 SocketAddress("::1", 5000), 948 false); 949 } 950 951 // Works. Receiving socket sees ::ffff:127.0.0.1. 952 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) { 953 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0), 954 SocketAddress("::ffff:127.0.0.2", 5000), 955 true); 956 } 957 958 // Works, receiving socket sees a result from GetNextIP. 959 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) { 960 CrossFamilyConnectionTest(SocketAddress("::", 0), 961 SocketAddress("0.0.0.0", 5000), 962 true); 963 } 964 965 // Works, receiving socket sees whatever GetNextIP gave the client. 966 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) { 967 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0), 968 SocketAddress("::", 5000), 969 true); 970 } 971 972 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) { 973 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0), 974 SocketAddress("::", 5000), 975 true); 976 } 977 978 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) { 979 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0), 980 SocketAddress("0.0.0.0", 5000), 981 true); 982 } 983 984 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) { 985 CrossFamilyDatagramTest(SocketAddress("::2", 0), 986 SocketAddress("0.0.0.0", 5000), 987 false); 988 } 989 990 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) { 991 CrossFamilyDatagramTest(SocketAddress("::2", 0), 992 SocketAddress("::ffff:127.0.0.1", 5000), 993 false); 994 } 995 996 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) { 997 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 998 SocketAddress("::", 5000), 999 true); 1000 } 1001 1002 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) { 1003 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0), 1004 SocketAddress("::1", 5000), 1005 false); 1006 } 1007 1008 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) { 1009 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0), 1010 SocketAddress("::ffff:127.0.0.2", 5000), 1011 true); 1012 } 1013 1014 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) { 1015 CrossFamilyDatagramTest(SocketAddress("::", 0), 1016 SocketAddress("0.0.0.0", 5000), 1017 true); 1018 } 1019 1020 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) { 1021 const uint32_t kTestMean[] = {10, 100, 333, 1000}; 1022 const double kTestDev[] = { 0.25, 0.1, 0.01 }; 1023 // TODO: The current code only works for 1000 data points or more. 1024 const uint32_t kTestSamples[] = {/*10, 100,*/ 1000}; 1025 for (size_t midx = 0; midx < arraysize(kTestMean); ++midx) { 1026 for (size_t didx = 0; didx < arraysize(kTestDev); ++didx) { 1027 for (size_t sidx = 0; sidx < arraysize(kTestSamples); ++sidx) { 1028 ASSERT_LT(0u, kTestSamples[sidx]); 1029 const uint32_t kStdDev = 1030 static_cast<uint32_t>(kTestDev[didx] * kTestMean[midx]); 1031 VirtualSocketServer::Function* f = 1032 VirtualSocketServer::CreateDistribution(kTestMean[midx], 1033 kStdDev, 1034 kTestSamples[sidx]); 1035 ASSERT_TRUE(NULL != f); 1036 ASSERT_EQ(kTestSamples[sidx], f->size()); 1037 double sum = 0; 1038 for (uint32_t i = 0; i < f->size(); ++i) { 1039 sum += (*f)[i].second; 1040 } 1041 const double mean = sum / f->size(); 1042 double sum_sq_dev = 0; 1043 for (uint32_t i = 0; i < f->size(); ++i) { 1044 double dev = (*f)[i].second - mean; 1045 sum_sq_dev += dev * dev; 1046 } 1047 const double stddev = sqrt(sum_sq_dev / f->size()); 1048 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx]) 1049 << "M=" << kTestMean[midx] 1050 << " SD=" << kStdDev 1051 << " N=" << kTestSamples[sidx]; 1052 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev) 1053 << "M=" << kTestMean[midx] 1054 << " SD=" << kStdDev 1055 << " N=" << kTestSamples[sidx]; 1056 delete f; 1057 } 1058 } 1059 } 1060 } 1061