Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2006 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include <math.h>
     12 #include <time.h>
     13 #if defined(WEBRTC_POSIX)
     14 #include <netinet/in.h>
     15 #endif
     16 
     17 #include "webrtc/base/logging.h"
     18 #include "webrtc/base/gunit.h"
     19 #include "webrtc/base/testclient.h"
     20 #include "webrtc/base/testutils.h"
     21 #include "webrtc/base/thread.h"
     22 #include "webrtc/base/timeutils.h"
     23 #include "webrtc/base/virtualsocketserver.h"
     24 #include "webrtc/test/testsupport/gtest_disable.h"
     25 
     26 using namespace rtc;
     27 
     28 // Sends at a constant rate but with random packet sizes.
     29 struct Sender : public MessageHandler {
     30   Sender(Thread* th, AsyncSocket* s, uint32 rt)
     31       : thread(th), socket(new AsyncUDPSocket(s)),
     32         done(false), rate(rt), count(0) {
     33     last_send = rtc::Time();
     34     thread->PostDelayed(NextDelay(), this, 1);
     35   }
     36 
     37   uint32 NextDelay() {
     38     uint32 size = (rand() % 4096) + 1;
     39     return 1000 * size / rate;
     40   }
     41 
     42   void OnMessage(Message* pmsg) {
     43     ASSERT_EQ(1u, pmsg->message_id);
     44 
     45     if (done)
     46       return;
     47 
     48     uint32 cur_time = rtc::Time();
     49     uint32 delay = cur_time - last_send;
     50     uint32 size = rate * delay / 1000;
     51     size = std::min<uint32>(size, 4096);
     52     size = std::max<uint32>(size, sizeof(uint32));
     53 
     54     count += size;
     55     memcpy(dummy, &cur_time, sizeof(cur_time));
     56     socket->Send(dummy, size, options);
     57 
     58     last_send = cur_time;
     59     thread->PostDelayed(NextDelay(), this, 1);
     60   }
     61 
     62   Thread* thread;
     63   scoped_ptr<AsyncUDPSocket> socket;
     64   rtc::PacketOptions options;
     65   bool done;
     66   uint32 rate;  // bytes per second
     67   uint32 count;
     68   uint32 last_send;
     69   char dummy[4096];
     70 };
     71 
     72 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
     73   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
     74       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
     75         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
     76     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
     77     thread->PostDelayed(1000, this, 1);
     78   }
     79 
     80   ~Receiver() {
     81     thread->Clear(this);
     82   }
     83 
     84   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
     85                     const SocketAddress& remote_addr,
     86                     const PacketTime& packet_time) {
     87     ASSERT_EQ(socket.get(), s);
     88     ASSERT_GE(size, 4U);
     89 
     90     count += size;
     91     sec_count += size;
     92 
     93     uint32 send_time = *reinterpret_cast<const uint32*>(data);
     94     uint32 recv_time = rtc::Time();
     95     uint32 delay = recv_time - send_time;
     96     sum += delay;
     97     sum_sq += delay * delay;
     98     samples += 1;
     99   }
    100 
    101   void OnMessage(Message* pmsg) {
    102     ASSERT_EQ(1u, pmsg->message_id);
    103 
    104     if (done)
    105       return;
    106 
    107     // It is always possible for us to receive more than expected because
    108     // packets can be further delayed in delivery.
    109     if (bandwidth > 0)
    110       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
    111     sec_count = 0;
    112     thread->PostDelayed(1000, this, 1);
    113   }
    114 
    115   Thread* thread;
    116   scoped_ptr<AsyncUDPSocket> socket;
    117   uint32 bandwidth;
    118   bool done;
    119   size_t count;
    120   size_t sec_count;
    121   double sum;
    122   double sum_sq;
    123   uint32 samples;
    124 };
    125 
    126 class VirtualSocketServerTest : public testing::Test {
    127  public:
    128   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
    129                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
    130                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
    131   }
    132 
    133   void CheckAddressIncrementalization(const SocketAddress& post,
    134                                       const SocketAddress& pre) {
    135     EXPECT_EQ(post.port(), pre.port() + 1);
    136     IPAddress post_ip = post.ipaddr();
    137     IPAddress pre_ip = pre.ipaddr();
    138     EXPECT_EQ(pre_ip.family(), post_ip.family());
    139     if (post_ip.family() == AF_INET) {
    140       in_addr pre_ipv4 = pre_ip.ipv4_address();
    141       in_addr post_ipv4 = post_ip.ipv4_address();
    142       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
    143       EXPECT_EQ(1, difference);
    144     } else if (post_ip.family() == AF_INET6) {
    145       in6_addr post_ip6 = post_ip.ipv6_address();
    146       in6_addr pre_ip6 = pre_ip.ipv6_address();
    147       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
    148       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
    149       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
    150     }
    151   }
    152 
    153   void BasicTest(const SocketAddress& initial_addr) {
    154     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
    155                                                  SOCK_DGRAM);
    156     socket->Bind(initial_addr);
    157     SocketAddress server_addr = socket->GetLocalAddress();
    158     // Make sure VSS didn't switch families on us.
    159     EXPECT_EQ(server_addr.family(), initial_addr.family());
    160 
    161     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    162     AsyncSocket* socket2 =
    163         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    164     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    165 
    166     SocketAddress client2_addr;
    167     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    168     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    169 
    170     SocketAddress client1_addr;
    171     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    172     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    173     EXPECT_EQ(client1_addr, server_addr);
    174 
    175     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
    176     for (int i = 0; i < 10; i++) {
    177       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
    178 
    179       SocketAddress next_client2_addr;
    180       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    181       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
    182       CheckAddressIncrementalization(next_client2_addr, client2_addr);
    183       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
    184 
    185       SocketAddress server_addr2;
    186       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
    187       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
    188       EXPECT_EQ(server_addr2, server_addr);
    189 
    190       client2_addr = next_client2_addr;
    191     }
    192   }
    193 
    194   // initial_addr should be made from either INADDR_ANY or in6addr_any.
    195   void ConnectTest(const SocketAddress& initial_addr) {
    196     testing::StreamSink sink;
    197     SocketAddress accept_addr;
    198     const SocketAddress kEmptyAddr =
    199         EmptySocketAddressWithFamily(initial_addr.family());
    200 
    201     // Create client
    202     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    203                                                  SOCK_STREAM);
    204     sink.Monitor(client);
    205     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    206     EXPECT_TRUE(client->GetLocalAddress().IsNil());
    207 
    208     // Create server
    209     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    210                                                  SOCK_STREAM);
    211     sink.Monitor(server);
    212     EXPECT_NE(0, server->Listen(5));  // Bind required
    213     EXPECT_EQ(0, server->Bind(initial_addr));
    214     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    215     EXPECT_EQ(0, server->Listen(5));
    216     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
    217 
    218     // No pending server connections
    219     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    220     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    221     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
    222 
    223     // Attempt connect to listening socket
    224     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    225     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
    226     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
    227     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
    228 
    229     // Client is connecting
    230     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    231     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    232     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    233 
    234     ss_->ProcessMessagesUntilIdle();
    235 
    236     // Client still connecting
    237     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    238     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    239     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    240 
    241     // Server has pending connection
    242     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    243     Socket* accepted = server->Accept(&accept_addr);
    244     EXPECT_TRUE(NULL != accepted);
    245     EXPECT_NE(accept_addr, kEmptyAddr);
    246     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
    247 
    248     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    249     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
    250     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
    251 
    252     ss_->ProcessMessagesUntilIdle();
    253 
    254     // Client has connected
    255     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
    256     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    257     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    258     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    259     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
    260   }
    261 
    262   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
    263     testing::StreamSink sink;
    264     SocketAddress accept_addr;
    265     const SocketAddress nil_addr;
    266     const SocketAddress empty_addr =
    267         EmptySocketAddressWithFamily(initial_addr.family());
    268 
    269     // Create client
    270     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    271                                                  SOCK_STREAM);
    272     sink.Monitor(client);
    273 
    274     // Create server
    275     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    276                                                  SOCK_STREAM);
    277     sink.Monitor(server);
    278     EXPECT_EQ(0, server->Bind(initial_addr));
    279     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    280     // Attempt connect to non-listening socket
    281     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    282 
    283     ss_->ProcessMessagesUntilIdle();
    284 
    285     // No pending server connections
    286     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    287     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    288     EXPECT_EQ(accept_addr, nil_addr);
    289 
    290     // Connection failed
    291     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    292     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    293     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
    294     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
    295   }
    296 
    297   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
    298     testing::StreamSink sink;
    299     SocketAddress accept_addr;
    300     const SocketAddress empty_addr =
    301         EmptySocketAddressWithFamily(initial_addr.family());
    302 
    303     // Create client and server
    304     scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
    305                                                           SOCK_STREAM));
    306     sink.Monitor(client.get());
    307     scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
    308                                                           SOCK_STREAM));
    309     sink.Monitor(server.get());
    310 
    311     // Initiate connect
    312     EXPECT_EQ(0, server->Bind(initial_addr));
    313     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    314 
    315     EXPECT_EQ(0, server->Listen(5));
    316     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    317 
    318     // Server close before socket enters accept queue
    319     EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
    320     server->Close();
    321 
    322     ss_->ProcessMessagesUntilIdle();
    323 
    324     // Result: connection failed
    325     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    326     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
    327 
    328     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
    329     sink.Monitor(server.get());
    330 
    331     // Initiate connect
    332     EXPECT_EQ(0, server->Bind(initial_addr));
    333     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    334 
    335     EXPECT_EQ(0, server->Listen(5));
    336     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    337 
    338     ss_->ProcessMessagesUntilIdle();
    339 
    340     // Server close while socket is in accept queue
    341     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
    342     server->Close();
    343 
    344     ss_->ProcessMessagesUntilIdle();
    345 
    346     // Result: connection failed
    347     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    348     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
    349 
    350     // New server
    351     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
    352     sink.Monitor(server.get());
    353 
    354     // Initiate connect
    355     EXPECT_EQ(0, server->Bind(initial_addr));
    356     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    357 
    358     EXPECT_EQ(0, server->Listen(5));
    359     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    360 
    361     ss_->ProcessMessagesUntilIdle();
    362 
    363     // Server accepts connection
    364     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
    365     scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
    366     ASSERT_TRUE(NULL != accepted.get());
    367     sink.Monitor(accepted.get());
    368 
    369     // Client closes before connection complets
    370     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    371 
    372     // Connected message has not been processed yet.
    373     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    374     client->Close();
    375 
    376     ss_->ProcessMessagesUntilIdle();
    377 
    378     // Result: accepted socket closes
    379     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
    380     EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
    381     EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
    382   }
    383 
    384   void CloseTest(const SocketAddress& initial_addr) {
    385     testing::StreamSink sink;
    386     const SocketAddress kEmptyAddr;
    387 
    388     // Create clients
    389     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    390     sink.Monitor(a);
    391     a->Bind(initial_addr);
    392     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    393 
    394 
    395     scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
    396                                                      SOCK_STREAM));
    397     sink.Monitor(b.get());
    398     b->Bind(initial_addr);
    399     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    400 
    401     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    402     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    403 
    404     ss_->ProcessMessagesUntilIdle();
    405 
    406     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
    407     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
    408     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
    409 
    410     EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
    411     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
    412     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
    413 
    414     EXPECT_EQ(1, a->Send("a", 1));
    415     b->Close();
    416     EXPECT_EQ(1, a->Send("b", 1));
    417 
    418     ss_->ProcessMessagesUntilIdle();
    419 
    420     char buffer[10];
    421     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
    422     EXPECT_EQ(-1, b->Recv(buffer, 10));
    423 
    424     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
    425     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
    426     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
    427 
    428     // No signal for Closer
    429     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
    430     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
    431     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
    432   }
    433 
    434   void TcpSendTest(const SocketAddress& initial_addr) {
    435     testing::StreamSink sink;
    436     const SocketAddress kEmptyAddr;
    437 
    438     // Connect two sockets
    439     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    440     sink.Monitor(a);
    441     a->Bind(initial_addr);
    442     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    443 
    444     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    445     sink.Monitor(b);
    446     b->Bind(initial_addr);
    447     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    448 
    449     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    450     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    451 
    452     ss_->ProcessMessagesUntilIdle();
    453 
    454     const size_t kBufferSize = 2000;
    455     ss_->set_send_buffer_capacity(kBufferSize);
    456     ss_->set_recv_buffer_capacity(kBufferSize);
    457 
    458     const size_t kDataSize = 5000;
    459     char send_buffer[kDataSize], recv_buffer[kDataSize];
    460     for (size_t i = 0; i < kDataSize; ++i)
    461       send_buffer[i] = static_cast<char>(i % 256);
    462     memset(recv_buffer, 0, sizeof(recv_buffer));
    463     size_t send_pos = 0, recv_pos = 0;
    464 
    465     // Can't send more than send buffer in one write
    466     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    467     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    468     send_pos += result;
    469 
    470     ss_->ProcessMessagesUntilIdle();
    471     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    472     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    473 
    474     // Receive buffer is already filled, fill send buffer again
    475     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    476     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    477     send_pos += result;
    478 
    479     ss_->ProcessMessagesUntilIdle();
    480     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    481     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    482 
    483     // No more room in send or receive buffer
    484     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    485     EXPECT_EQ(-1, result);
    486     EXPECT_TRUE(a->IsBlocking());
    487 
    488     // Read a subset of the data
    489     result = b->Recv(recv_buffer + recv_pos, 500);
    490     EXPECT_EQ(500, result);
    491     recv_pos += result;
    492 
    493     ss_->ProcessMessagesUntilIdle();
    494     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
    495     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    496 
    497     // Room for more on the sending side
    498     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    499     EXPECT_EQ(500, result);
    500     send_pos += result;
    501 
    502     // Empty the recv buffer
    503     while (true) {
    504       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    505       if (result < 0) {
    506         EXPECT_EQ(-1, result);
    507         EXPECT_TRUE(b->IsBlocking());
    508         break;
    509       }
    510       recv_pos += result;
    511     }
    512 
    513     ss_->ProcessMessagesUntilIdle();
    514     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    515 
    516     // Continue to empty the recv buffer
    517     while (true) {
    518       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    519       if (result < 0) {
    520         EXPECT_EQ(-1, result);
    521         EXPECT_TRUE(b->IsBlocking());
    522         break;
    523       }
    524       recv_pos += result;
    525     }
    526 
    527     // Send last of the data
    528     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    529     EXPECT_EQ(500, result);
    530     send_pos += result;
    531 
    532     ss_->ProcessMessagesUntilIdle();
    533     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    534 
    535     // Receive the last of the data
    536     while (true) {
    537       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    538       if (result < 0) {
    539         EXPECT_EQ(-1, result);
    540         EXPECT_TRUE(b->IsBlocking());
    541         break;
    542       }
    543       recv_pos += result;
    544     }
    545 
    546     ss_->ProcessMessagesUntilIdle();
    547     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    548 
    549     // The received data matches the sent data
    550     EXPECT_EQ(kDataSize, send_pos);
    551     EXPECT_EQ(kDataSize, recv_pos);
    552     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
    553   }
    554 
    555   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
    556     const SocketAddress kEmptyAddr;
    557 
    558     // Connect two sockets
    559     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
    560                                             SOCK_STREAM);
    561     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
    562                                             SOCK_STREAM);
    563     a->Bind(initial_addr);
    564     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    565 
    566     b->Bind(initial_addr);
    567     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    568 
    569     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    570     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    571     ss_->ProcessMessagesUntilIdle();
    572 
    573     // First, deliver all packets in 0 ms.
    574     char buffer[2] = { 0, 0 };
    575     const char cNumPackets = 10;
    576     for (char i = 0; i < cNumPackets; ++i) {
    577       buffer[0] = '0' + i;
    578       EXPECT_EQ(1, a->Send(buffer, 1));
    579     }
    580 
    581     ss_->ProcessMessagesUntilIdle();
    582 
    583     for (char i = 0; i < cNumPackets; ++i) {
    584       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    585       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
    586     }
    587 
    588     // Next, deliver packets at random intervals
    589     const uint32 mean = 50;
    590     const uint32 stddev = 50;
    591 
    592     ss_->set_delay_mean(mean);
    593     ss_->set_delay_stddev(stddev);
    594     ss_->UpdateDelayDistribution();
    595 
    596     for (char i = 0; i < cNumPackets; ++i) {
    597       buffer[0] = 'A' + i;
    598       EXPECT_EQ(1, a->Send(buffer, 1));
    599     }
    600 
    601     ss_->ProcessMessagesUntilIdle();
    602 
    603     for (char i = 0; i < cNumPackets; ++i) {
    604       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    605       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
    606     }
    607   }
    608 
    609   void BandwidthTest(const SocketAddress& initial_addr) {
    610     AsyncSocket* send_socket =
    611         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    612     AsyncSocket* recv_socket =
    613         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    614     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    615     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    616     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    617     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    618     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    619 
    620     uint32 bandwidth = 64 * 1024;
    621     ss_->set_bandwidth(bandwidth);
    622 
    623     Thread* pthMain = Thread::Current();
    624     Sender sender(pthMain, send_socket, 80 * 1024);
    625     Receiver receiver(pthMain, recv_socket, bandwidth);
    626 
    627     pthMain->ProcessMessages(5000);
    628     sender.done = true;
    629     pthMain->ProcessMessages(5000);
    630 
    631     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
    632     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
    633 
    634     ss_->set_bandwidth(0);
    635   }
    636 
    637   void DelayTest(const SocketAddress& initial_addr) {
    638     time_t seed = ::time(NULL);
    639     LOG(LS_VERBOSE) << "seed = " << seed;
    640     srand(static_cast<unsigned int>(seed));
    641 
    642     const uint32 mean = 2000;
    643     const uint32 stddev = 500;
    644 
    645     ss_->set_delay_mean(mean);
    646     ss_->set_delay_stddev(stddev);
    647     ss_->UpdateDelayDistribution();
    648 
    649     AsyncSocket* send_socket =
    650         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    651     AsyncSocket* recv_socket =
    652         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    653     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    654     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    655     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    656     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    657     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    658 
    659     Thread* pthMain = Thread::Current();
    660     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
    661     // 1000 packets, which is necessary to get a good distribution.
    662     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
    663     Receiver receiver(pthMain, recv_socket, 0);
    664 
    665     pthMain->ProcessMessages(10000);
    666     sender.done = receiver.done = true;
    667     ss_->ProcessMessagesUntilIdle();
    668 
    669     const double sample_mean = receiver.sum / receiver.samples;
    670     double num =
    671         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
    672     double den = receiver.samples * (receiver.samples - 1);
    673     const double sample_stddev = sqrt(num / den);
    674     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
    675 
    676     EXPECT_LE(500u, receiver.samples);
    677     // We initially used a 0.1 fudge factor, but on the build machine, we
    678     // have seen the value differ by as much as 0.13.
    679     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
    680     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
    681 
    682     ss_->set_delay_mean(0);
    683     ss_->set_delay_stddev(0);
    684     ss_->UpdateDelayDistribution();
    685   }
    686 
    687   // Test cross-family communication between a client bound to client_addr and a
    688   // server bound to server_addr. shouldSucceed indicates if communication is
    689   // expected to work or not.
    690   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
    691                                  const SocketAddress& server_addr,
    692                                  bool shouldSucceed) {
    693     testing::StreamSink sink;
    694     SocketAddress accept_address;
    695     const SocketAddress kEmptyAddr;
    696 
    697     // Client gets a IPv4 address
    698     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
    699                                                  SOCK_STREAM);
    700     sink.Monitor(client);
    701     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    702     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
    703     client->Bind(client_addr);
    704 
    705     // Server gets a non-mapped non-any IPv6 address.
    706     // IPv4 sockets should not be able to connect to this.
    707     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
    708                                                  SOCK_STREAM);
    709     sink.Monitor(server);
    710     server->Bind(server_addr);
    711     server->Listen(5);
    712 
    713     if (shouldSucceed) {
    714       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    715       ss_->ProcessMessagesUntilIdle();
    716       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    717       Socket* accepted = server->Accept(&accept_address);
    718       EXPECT_TRUE(NULL != accepted);
    719       EXPECT_NE(kEmptyAddr, accept_address);
    720       ss_->ProcessMessagesUntilIdle();
    721       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    722       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    723     } else {
    724       // Check that the connection failed.
    725       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
    726       ss_->ProcessMessagesUntilIdle();
    727 
    728       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    729       EXPECT_TRUE(NULL == server->Accept(&accept_address));
    730       EXPECT_EQ(accept_address, kEmptyAddr);
    731       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    732       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    733       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
    734     }
    735   }
    736 
    737   // Test cross-family datagram sending between a client bound to client_addr
    738   // and a server bound to server_addr. shouldSucceed indicates if sending is
    739   // expected to succed or not.
    740   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
    741                                const SocketAddress& server_addr,
    742                                bool shouldSucceed) {
    743     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
    744     socket->Bind(server_addr);
    745     SocketAddress bound_server_addr = socket->GetLocalAddress();
    746     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    747 
    748     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
    749     socket2->Bind(client_addr);
    750     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    751     SocketAddress client2_addr;
    752 
    753     if (shouldSucceed) {
    754       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
    755       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    756       SocketAddress client1_addr;
    757       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    758       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    759       EXPECT_EQ(client1_addr, bound_server_addr);
    760     } else {
    761       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
    762       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
    763     }
    764   }
    765 
    766  protected:
    767   virtual void SetUp() {
    768     Thread::Current()->set_socketserver(ss_);
    769   }
    770   virtual void TearDown() {
    771     Thread::Current()->set_socketserver(NULL);
    772   }
    773 
    774   VirtualSocketServer* ss_;
    775   const SocketAddress kIPv4AnyAddress;
    776   const SocketAddress kIPv6AnyAddress;
    777 };
    778 
    779 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(basic_v4)) {
    780   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
    781   BasicTest(ipv4_test_addr);
    782 }
    783 
    784 TEST_F(VirtualSocketServerTest,DISABLED_ON_MAC( basic_v6)) {
    785   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
    786   BasicTest(ipv6_test_addr);
    787 }
    788 
    789 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_v4)) {
    790   ConnectTest(kIPv4AnyAddress);
    791 }
    792 
    793 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_v6)) {
    794   ConnectTest(kIPv6AnyAddress);
    795 }
    796 
    797 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_to_non_listener_v4)) {
    798   ConnectToNonListenerTest(kIPv4AnyAddress);
    799 }
    800 
    801 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_to_non_listener_v6)) {
    802   ConnectToNonListenerTest(kIPv6AnyAddress);
    803 }
    804 
    805 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(close_during_connect_v4)) {
    806   CloseDuringConnectTest(kIPv4AnyAddress);
    807 }
    808 
    809 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(close_during_connect_v6)) {
    810   CloseDuringConnectTest(kIPv6AnyAddress);
    811 }
    812 
    813 TEST_F(VirtualSocketServerTest, close_v4) {
    814   CloseTest(kIPv4AnyAddress);
    815 }
    816 
    817 TEST_F(VirtualSocketServerTest, close_v6) {
    818   CloseTest(kIPv6AnyAddress);
    819 }
    820 
    821 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(tcp_send_v4)) {
    822   TcpSendTest(kIPv4AnyAddress);
    823 }
    824 
    825 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(tcp_send_v6)) {
    826   TcpSendTest(kIPv6AnyAddress);
    827 }
    828 
    829 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
    830   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
    831 }
    832 
    833 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
    834   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
    835 }
    836 
    837 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(bandwidth_v4)) {
    838   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    839   BandwidthTest(ipv4_test_addr);
    840 }
    841 
    842 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(bandwidth_v6)) {
    843   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    844   BandwidthTest(ipv6_test_addr);
    845 }
    846 
    847 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(delay_v4)) {
    848   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    849   DelayTest(ipv4_test_addr);
    850 }
    851 
    852 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
    853 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
    854   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    855   DelayTest(ipv6_test_addr);
    856 }
    857 
    858 // Works, receiving socket sees 127.0.0.2.
    859 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromMappedIPv6ToIPv4Any)) {
    860   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
    861                             SocketAddress("0.0.0.0", 5000),
    862                             true);
    863 }
    864 
    865 // Fails.
    866 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromUnMappedIPv6ToIPv4Any)) {
    867   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    868                             SocketAddress("0.0.0.0", 5000),
    869                             false);
    870 }
    871 
    872 // Fails.
    873 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromUnMappedIPv6ToMappedIPv6)) {
    874   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    875                             SocketAddress("::ffff:127.0.0.1", 5000),
    876                             false);
    877 }
    878 
    879 // Works. receiving socket sees ::ffff:127.0.0.2.
    880 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromIPv4ToIPv6Any)) {
    881   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    882                             SocketAddress("::", 5000),
    883                             true);
    884 }
    885 
    886 // Fails.
    887 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromIPv4ToUnMappedIPv6)) {
    888   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    889                             SocketAddress("::1", 5000),
    890                             false);
    891 }
    892 
    893 // Works. Receiving socket sees ::ffff:127.0.0.1.
    894 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromIPv4ToMappedIPv6)) {
    895   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
    896                             SocketAddress("::ffff:127.0.0.2", 5000),
    897                             true);
    898 }
    899 
    900 // Works, receiving socket sees a result from GetNextIP.
    901 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromUnboundIPv6ToIPv4Any)) {
    902   CrossFamilyConnectionTest(SocketAddress("::", 0),
    903                             SocketAddress("0.0.0.0", 5000),
    904                             true);
    905 }
    906 
    907 // Works, receiving socket sees whatever GetNextIP gave the client.
    908 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromUnboundIPv4ToIPv6Any)) {
    909   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
    910                             SocketAddress("::", 5000),
    911                             true);
    912 }
    913 
    914 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromUnboundIPv4ToIPv6Any)) {
    915   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
    916                           SocketAddress("::", 5000),
    917                           true);
    918 }
    919 
    920 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromMappedIPv6ToIPv4Any)) {
    921   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
    922                           SocketAddress("0.0.0.0", 5000),
    923                           true);
    924 }
    925 
    926 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromUnMappedIPv6ToIPv4Any)) {
    927   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    928                           SocketAddress("0.0.0.0", 5000),
    929                           false);
    930 }
    931 
    932 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromUnMappedIPv6ToMappedIPv6)) {
    933   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    934                           SocketAddress("::ffff:127.0.0.1", 5000),
    935                           false);
    936 }
    937 
    938 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromIPv4ToIPv6Any)) {
    939   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    940                           SocketAddress("::", 5000),
    941                           true);
    942 }
    943 
    944 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromIPv4ToUnMappedIPv6)) {
    945   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    946                           SocketAddress("::1", 5000),
    947                           false);
    948 }
    949 
    950 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromIPv4ToMappedIPv6)) {
    951   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
    952                           SocketAddress("::ffff:127.0.0.2", 5000),
    953                           true);
    954 }
    955 
    956 TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromUnboundIPv6ToIPv4Any)) {
    957   CrossFamilyDatagramTest(SocketAddress("::", 0),
    958                           SocketAddress("0.0.0.0", 5000),
    959                           true);
    960 }
    961 
    962 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
    963   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
    964   const double kTestDev[] = { 0.25, 0.1, 0.01 };
    965   // TODO: The current code only works for 1000 data points or more.
    966   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
    967   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
    968     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
    969       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
    970         ASSERT_LT(0u, kTestSamples[sidx]);
    971         const uint32 kStdDev =
    972             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
    973         VirtualSocketServer::Function* f =
    974             VirtualSocketServer::CreateDistribution(kTestMean[midx],
    975                                                     kStdDev,
    976                                                     kTestSamples[sidx]);
    977         ASSERT_TRUE(NULL != f);
    978         ASSERT_EQ(kTestSamples[sidx], f->size());
    979         double sum = 0;
    980         for (uint32 i = 0; i < f->size(); ++i) {
    981           sum += (*f)[i].second;
    982         }
    983         const double mean = sum / f->size();
    984         double sum_sq_dev = 0;
    985         for (uint32 i = 0; i < f->size(); ++i) {
    986           double dev = (*f)[i].second - mean;
    987           sum_sq_dev += dev * dev;
    988         }
    989         const double stddev = sqrt(sum_sq_dev / f->size());
    990         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
    991           << "M=" << kTestMean[midx]
    992           << " SD=" << kStdDev
    993           << " N=" << kTestSamples[sidx];
    994         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
    995           << "M=" << kTestMean[midx]
    996           << " SD=" << kStdDev
    997           << " N=" << kTestSamples[sidx];
    998         delete f;
    999       }
   1000     }
   1001   }
   1002 }
   1003