Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2006, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <time.h>
     29 #ifdef POSIX
     30 #include <netinet/in.h>
     31 #endif
     32 #include <cmath>
     33 
     34 #include "talk/base/logging.h"
     35 #include "talk/base/gunit.h"
     36 #include "talk/base/testclient.h"
     37 #include "talk/base/testutils.h"
     38 #include "talk/base/thread.h"
     39 #include "talk/base/timeutils.h"
     40 #include "talk/base/virtualsocketserver.h"
     41 
     42 using namespace talk_base;
     43 
     44 // Sends at a constant rate but with random packet sizes.
     45 struct Sender : public MessageHandler {
     46   Sender(Thread* th, AsyncSocket* s, uint32 rt)
     47       : thread(th), socket(new AsyncUDPSocket(s)),
     48         done(false), rate(rt), count(0) {
     49     last_send = Time();
     50     thread->PostDelayed(NextDelay(), this, 1);
     51   }
     52 
     53   uint32 NextDelay() {
     54     uint32 size = (rand() % 4096) + 1;
     55     return 1000 * size / rate;
     56   }
     57 
     58   void OnMessage(Message* pmsg) {
     59     ASSERT_EQ(1u, pmsg->message_id);
     60 
     61     if (done)
     62       return;
     63 
     64     uint32 cur_time = Time();
     65     uint32 delay = cur_time - last_send;
     66     uint32 size = rate * delay / 1000;
     67     size = std::min<uint32>(size, 4096);
     68     size = std::max<uint32>(size, sizeof(uint32));
     69 
     70     count += size;
     71     memcpy(dummy, &cur_time, sizeof(cur_time));
     72     socket->Send(dummy, size);
     73 
     74     last_send = cur_time;
     75     thread->PostDelayed(NextDelay(), this, 1);
     76   }
     77 
     78   Thread* thread;
     79   scoped_ptr<AsyncUDPSocket> socket;
     80   bool done;
     81   uint32 rate;  // bytes per second
     82   uint32 count;
     83   uint32 last_send;
     84   char dummy[4096];
     85 };
     86 
     87 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
     88   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
     89       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
     90         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
     91     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
     92     thread->PostDelayed(1000, this, 1);
     93   }
     94 
     95   ~Receiver() {
     96     thread->Clear(this);
     97   }
     98 
     99   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
    100                     const SocketAddress& remote_addr) {
    101     ASSERT_EQ(socket.get(), s);
    102     ASSERT_GE(size, 4U);
    103 
    104     count += size;
    105     sec_count += size;
    106 
    107     uint32 send_time = *reinterpret_cast<const uint32*>(data);
    108     uint32 recv_time = Time();
    109     uint32 delay = recv_time - send_time;
    110     sum += delay;
    111     sum_sq += delay * delay;
    112     samples += 1;
    113   }
    114 
    115   void OnMessage(Message* pmsg) {
    116     ASSERT_EQ(1u, pmsg->message_id);
    117 
    118     if (done)
    119       return;
    120 
    121     // It is always possible for us to receive more than expected because
    122     // packets can be further delayed in delivery.
    123     if (bandwidth > 0)
    124       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
    125     sec_count = 0;
    126     thread->PostDelayed(1000, this, 1);
    127   }
    128 
    129   Thread* thread;
    130   scoped_ptr<AsyncUDPSocket> socket;
    131   uint32 bandwidth;
    132   bool done;
    133   size_t count;
    134   size_t sec_count;
    135   double sum;
    136   double sum_sq;
    137   uint32 samples;
    138 };
    139 
    140 class VirtualSocketServerTest : public testing::Test {
    141  public:
    142   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
    143                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
    144                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
    145   }
    146 
    147   void CheckAddressIncrementalization(const SocketAddress& post,
    148                                       const SocketAddress& pre) {
    149     EXPECT_EQ(post.port(), pre.port() + 1);
    150     IPAddress post_ip = post.ipaddr();
    151     IPAddress pre_ip = pre.ipaddr();
    152     EXPECT_EQ(pre_ip.family(), post_ip.family());
    153     if (post_ip.family() == AF_INET) {
    154       in_addr pre_ipv4 = pre_ip.ipv4_address();
    155       in_addr post_ipv4 = post_ip.ipv4_address();
    156       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
    157       EXPECT_EQ(1, difference);
    158     } else if (post_ip.family() == AF_INET6) {
    159       in6_addr post_ip6 = post_ip.ipv6_address();
    160       in6_addr pre_ip6 = pre_ip.ipv6_address();
    161       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
    162       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
    163       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
    164     }
    165   }
    166 
    167   void BasicTest(const SocketAddress& initial_addr) {
    168     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
    169                                                  SOCK_DGRAM);
    170     socket->Bind(initial_addr);
    171     SocketAddress server_addr = socket->GetLocalAddress();
    172     // Make sure VSS didn't switch families on us.
    173     EXPECT_EQ(server_addr.family(), initial_addr.family());
    174 
    175     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    176     AsyncSocket* socket2 =
    177         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    178     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    179 
    180     SocketAddress client2_addr;
    181     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    182     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    183 
    184     SocketAddress client1_addr;
    185     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    186     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    187     EXPECT_EQ(client1_addr, server_addr);
    188 
    189     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
    190     for (int i = 0; i < 10; i++) {
    191       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
    192 
    193       SocketAddress next_client2_addr;
    194       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    195       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
    196       CheckAddressIncrementalization(next_client2_addr, client2_addr);
    197       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
    198 
    199       SocketAddress server_addr2;
    200       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
    201       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
    202       EXPECT_EQ(server_addr2, server_addr);
    203 
    204       client2_addr = next_client2_addr;
    205     }
    206   }
    207 
    208   // initial_addr should be made from either INADDR_ANY or in6addr_any.
    209   void ConnectTest(const SocketAddress& initial_addr) {
    210     testing::StreamSink sink;
    211     SocketAddress accept_addr;
    212     const SocketAddress kEmptyAddr =
    213         EmptySocketAddressWithFamily(initial_addr.family());
    214 
    215     // Create client
    216     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    217                                                  SOCK_STREAM);
    218     sink.Monitor(client);
    219     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    220     EXPECT_TRUE(client->GetLocalAddress().IsNil());
    221 
    222     // Create server
    223     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    224                                                  SOCK_STREAM);
    225     sink.Monitor(server);
    226     EXPECT_NE(0, server->Listen(5));  // Bind required
    227     EXPECT_EQ(0, server->Bind(initial_addr));
    228     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    229     EXPECT_EQ(0, server->Listen(5));
    230     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
    231 
    232     // No pending server connections
    233     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    234     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    235     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
    236 
    237     // Attempt connect to listening socket
    238     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    239     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
    240     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
    241     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
    242 
    243     // Client is connecting
    244     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    245     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    246     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    247 
    248     ss_->ProcessMessagesUntilIdle();
    249 
    250     // Client still connecting
    251     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    252     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    253     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    254 
    255     // Server has pending connection
    256     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    257     Socket* accepted = server->Accept(&accept_addr);
    258     EXPECT_TRUE(NULL != accepted);
    259     EXPECT_NE(accept_addr, kEmptyAddr);
    260     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
    261 
    262     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    263     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
    264     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
    265 
    266     ss_->ProcessMessagesUntilIdle();
    267 
    268     // Client has connected
    269     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
    270     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    271     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    272     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    273     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
    274   }
    275 
    276   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
    277     testing::StreamSink sink;
    278     SocketAddress accept_addr;
    279     const SocketAddress nil_addr;
    280     const SocketAddress empty_addr =
    281         EmptySocketAddressWithFamily(initial_addr.family());
    282 
    283     // Create client
    284     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    285                                                  SOCK_STREAM);
    286     sink.Monitor(client);
    287 
    288     // Create server
    289     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    290                                                  SOCK_STREAM);
    291     sink.Monitor(server);
    292     EXPECT_EQ(0, server->Bind(initial_addr));
    293     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    294     // Attempt connect to non-listening socket
    295     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    296 
    297     ss_->ProcessMessagesUntilIdle();
    298 
    299     // No pending server connections
    300     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    301     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    302     EXPECT_EQ(accept_addr, nil_addr);
    303 
    304     // Connection failed
    305     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    306     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    307     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
    308     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
    309   }
    310 
    311   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
    312     testing::StreamSink sink;
    313     SocketAddress accept_addr;
    314     const SocketAddress empty_addr =
    315         EmptySocketAddressWithFamily(initial_addr.family());
    316 
    317     // Create client and server
    318     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    319                                                  SOCK_STREAM);
    320     sink.Monitor(client);
    321     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    322                                                  SOCK_STREAM);
    323     sink.Monitor(server);
    324 
    325     // Initiate connect
    326     EXPECT_EQ(0, server->Bind(initial_addr));
    327     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    328 
    329     EXPECT_EQ(0, server->Listen(5));
    330     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    331 
    332     // Server close before socket enters accept queue
    333     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    334     server->Close();
    335 
    336     ss_->ProcessMessagesUntilIdle();
    337 
    338     // Result: connection failed
    339     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    340     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
    341 
    342     // New server
    343     delete server;
    344     server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    345     sink.Monitor(server);
    346 
    347     // Initiate connect
    348     EXPECT_EQ(0, server->Bind(initial_addr));
    349     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    350 
    351     EXPECT_EQ(0, server->Listen(5));
    352     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    353 
    354     ss_->ProcessMessagesUntilIdle();
    355 
    356     // Server close while socket is in accept queue
    357     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    358     server->Close();
    359 
    360     ss_->ProcessMessagesUntilIdle();
    361 
    362     // Result: connection failed
    363     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    364     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
    365 
    366     // New server
    367     delete server;
    368     server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    369     sink.Monitor(server);
    370 
    371     // Initiate connect
    372     EXPECT_EQ(0, server->Bind(initial_addr));
    373     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    374 
    375     EXPECT_EQ(0, server->Listen(5));
    376     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    377 
    378     ss_->ProcessMessagesUntilIdle();
    379 
    380     // Server accepts connection
    381     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    382     AsyncSocket* accepted = server->Accept(&accept_addr);
    383     ASSERT_TRUE(NULL != accepted);
    384     sink.Monitor(accepted);
    385 
    386     // Client closes before connection complets
    387     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    388 
    389     // Connected message has not been processed yet.
    390     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    391     client->Close();
    392 
    393     ss_->ProcessMessagesUntilIdle();
    394 
    395     // Result: accepted socket closes
    396     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
    397     EXPECT_TRUE(sink.Check(accepted, testing::SSE_CLOSE));
    398     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    399   }
    400 
    401   void CloseTest(const SocketAddress& initial_addr) {
    402     testing::StreamSink sink;
    403     const SocketAddress kEmptyAddr;
    404 
    405     // Create clients
    406     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    407     sink.Monitor(a);
    408     a->Bind(initial_addr);
    409     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    410 
    411 
    412     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    413     sink.Monitor(b);
    414     b->Bind(initial_addr);
    415     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    416 
    417     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    418     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    419 
    420     ss_->ProcessMessagesUntilIdle();
    421 
    422     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
    423     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
    424     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
    425 
    426     EXPECT_TRUE(sink.Check(b, testing::SSE_OPEN));
    427     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
    428     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
    429 
    430     EXPECT_EQ(1, a->Send("a", 1));
    431     b->Close();
    432     EXPECT_EQ(1, a->Send("b", 1));
    433 
    434     ss_->ProcessMessagesUntilIdle();
    435 
    436     char buffer[10];
    437     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    438     EXPECT_EQ(-1, b->Recv(buffer, 10));
    439 
    440     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
    441     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
    442     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
    443 
    444     EXPECT_FALSE(sink.Check(b, testing::SSE_CLOSE));  // No signal for Closer
    445     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
    446     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
    447   }
    448 
    449   void TcpSendTest(const SocketAddress& initial_addr) {
    450     testing::StreamSink sink;
    451     const SocketAddress kEmptyAddr;
    452 
    453     // Connect two sockets
    454     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    455     sink.Monitor(a);
    456     a->Bind(initial_addr);
    457     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    458 
    459     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    460     sink.Monitor(b);
    461     b->Bind(initial_addr);
    462     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    463 
    464     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    465     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    466 
    467     ss_->ProcessMessagesUntilIdle();
    468 
    469     const size_t kBufferSize = 2000;
    470     ss_->set_send_buffer_capacity(kBufferSize);
    471     ss_->set_recv_buffer_capacity(kBufferSize);
    472 
    473     const size_t kDataSize = 5000;
    474     char send_buffer[kDataSize], recv_buffer[kDataSize];
    475     for (size_t i = 0; i < kDataSize; ++i)
    476       send_buffer[i] = static_cast<char>(i % 256);
    477     memset(recv_buffer, 0, sizeof(recv_buffer));
    478     size_t send_pos = 0, recv_pos = 0;
    479 
    480     // Can't send more than send buffer in one write
    481     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    482     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    483     send_pos += result;
    484 
    485     ss_->ProcessMessagesUntilIdle();
    486     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    487     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    488 
    489     // Receive buffer is already filled, fill send buffer again
    490     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    491     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    492     send_pos += result;
    493 
    494     ss_->ProcessMessagesUntilIdle();
    495     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    496     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    497 
    498     // No more room in send or receive buffer
    499     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    500     EXPECT_EQ(-1, result);
    501     EXPECT_TRUE(a->IsBlocking());
    502 
    503     // Read a subset of the data
    504     result = b->Recv(recv_buffer + recv_pos, 500);
    505     EXPECT_EQ(500, result);
    506     recv_pos += result;
    507 
    508     ss_->ProcessMessagesUntilIdle();
    509     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
    510     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    511 
    512     // Room for more on the sending side
    513     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    514     EXPECT_EQ(500, result);
    515     send_pos += result;
    516 
    517     // Empty the recv buffer
    518     while (true) {
    519       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    520       if (result < 0) {
    521         EXPECT_EQ(-1, result);
    522         EXPECT_TRUE(b->IsBlocking());
    523         break;
    524       }
    525       recv_pos += result;
    526     }
    527 
    528     ss_->ProcessMessagesUntilIdle();
    529     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    530 
    531     // Continue to empty the recv buffer
    532     while (true) {
    533       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    534       if (result < 0) {
    535         EXPECT_EQ(-1, result);
    536         EXPECT_TRUE(b->IsBlocking());
    537         break;
    538       }
    539       recv_pos += result;
    540     }
    541 
    542     // Send last of the data
    543     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    544     EXPECT_EQ(500, result);
    545     send_pos += result;
    546 
    547     ss_->ProcessMessagesUntilIdle();
    548     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    549 
    550     // Receive the last of the data
    551     while (true) {
    552       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    553       if (result < 0) {
    554         EXPECT_EQ(-1, result);
    555         EXPECT_TRUE(b->IsBlocking());
    556         break;
    557       }
    558       recv_pos += result;
    559     }
    560 
    561     ss_->ProcessMessagesUntilIdle();
    562     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    563 
    564     // The received data matches the sent data
    565     EXPECT_EQ(kDataSize, send_pos);
    566     EXPECT_EQ(kDataSize, recv_pos);
    567     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
    568   }
    569 
    570   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
    571     const SocketAddress kEmptyAddr;
    572 
    573     // Connect two sockets
    574     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
    575                                             SOCK_STREAM);
    576     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
    577                                             SOCK_STREAM);
    578     a->Bind(initial_addr);
    579     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    580 
    581     b->Bind(initial_addr);
    582     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    583 
    584     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    585     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    586     ss_->ProcessMessagesUntilIdle();
    587 
    588     // First, deliver all packets in 0 ms.
    589     char buffer[2] = { 0, 0 };
    590     const char cNumPackets = 10;
    591     for (char i = 0; i < cNumPackets; ++i) {
    592       buffer[0] = '0' + i;
    593       EXPECT_EQ(1, a->Send(buffer, 1));
    594     }
    595 
    596     ss_->ProcessMessagesUntilIdle();
    597 
    598     for (char i = 0; i < cNumPackets; ++i) {
    599       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    600       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
    601     }
    602 
    603     // Next, deliver packets at random intervals
    604     const uint32 mean = 50;
    605     const uint32 stddev = 50;
    606 
    607     ss_->set_delay_mean(mean);
    608     ss_->set_delay_stddev(stddev);
    609     ss_->UpdateDelayDistribution();
    610 
    611     for (char i = 0; i < cNumPackets; ++i) {
    612       buffer[0] = 'A' + i;
    613       EXPECT_EQ(1, a->Send(buffer, 1));
    614     }
    615 
    616     ss_->ProcessMessagesUntilIdle();
    617 
    618     for (char i = 0; i < cNumPackets; ++i) {
    619       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    620       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
    621     }
    622   }
    623 
    624   void BandwidthTest(const SocketAddress& initial_addr) {
    625     AsyncSocket* send_socket =
    626         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    627     AsyncSocket* recv_socket =
    628         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    629     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    630     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    631     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    632     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    633     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    634 
    635     uint32 bandwidth = 64 * 1024;
    636     ss_->set_bandwidth(bandwidth);
    637 
    638     Thread* pthMain = Thread::Current();
    639     Sender sender(pthMain, send_socket, 80 * 1024);
    640     Receiver receiver(pthMain, recv_socket, bandwidth);
    641 
    642     pthMain->ProcessMessages(5000);
    643     sender.done = true;
    644     pthMain->ProcessMessages(5000);
    645 
    646     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
    647     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
    648 
    649     ss_->set_bandwidth(0);
    650   }
    651 
    652   void DelayTest(const SocketAddress& initial_addr) {
    653     time_t seed = ::time(NULL);
    654     LOG(LS_VERBOSE) << "seed = " << seed;
    655     srand(static_cast<unsigned int>(seed));
    656 
    657     const uint32 mean = 2000;
    658     const uint32 stddev = 500;
    659 
    660     ss_->set_delay_mean(mean);
    661     ss_->set_delay_stddev(stddev);
    662     ss_->UpdateDelayDistribution();
    663 
    664     AsyncSocket* send_socket =
    665         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    666     AsyncSocket* recv_socket =
    667         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    668     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    669     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    670     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    671     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    672     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    673 
    674     Thread* pthMain = Thread::Current();
    675     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
    676     // 1000 packets, which is necessary to get a good distribution.
    677     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
    678     Receiver receiver(pthMain, recv_socket, 0);
    679 
    680     pthMain->ProcessMessages(10000);
    681     sender.done = receiver.done = true;
    682     ss_->ProcessMessagesUntilIdle();
    683 
    684     const double sample_mean = receiver.sum / receiver.samples;
    685     double num =
    686         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
    687     double den = receiver.samples * (receiver.samples - 1);
    688     const double sample_stddev = std::sqrt(num / den);
    689     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
    690 
    691     EXPECT_LE(500u, receiver.samples);
    692     // We initially used a 0.1 fudge factor, but on the build machine, we
    693     // have seen the value differ by as much as 0.13.
    694     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
    695     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
    696 
    697     ss_->set_delay_mean(0);
    698     ss_->set_delay_stddev(0);
    699     ss_->UpdateDelayDistribution();
    700   }
    701 
    702   // Test cross-family communication between a client bound to client_addr and a
    703   // server bound to server_addr. shouldSucceed indicates if communication is
    704   // expected to work or not.
    705   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
    706                                  const SocketAddress& server_addr,
    707                                  bool shouldSucceed) {
    708     testing::StreamSink sink;
    709     SocketAddress accept_address;
    710     const SocketAddress kEmptyAddr;
    711 
    712     // Client gets a IPv4 address
    713     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
    714                                                  SOCK_STREAM);
    715     sink.Monitor(client);
    716     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    717     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
    718     client->Bind(client_addr);
    719 
    720     // Server gets a non-mapped non-any IPv6 address.
    721     // IPv4 sockets should not be able to connect to this.
    722     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
    723                                                  SOCK_STREAM);
    724     sink.Monitor(server);
    725     server->Bind(server_addr);
    726     server->Listen(5);
    727 
    728     if (shouldSucceed) {
    729       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    730       ss_->ProcessMessagesUntilIdle();
    731       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    732       Socket* accepted = server->Accept(&accept_address);
    733       EXPECT_TRUE(NULL != accepted);
    734       EXPECT_NE(kEmptyAddr, accept_address);
    735       ss_->ProcessMessagesUntilIdle();
    736       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    737       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    738     } else {
    739       // Check that the connection failed.
    740       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
    741       ss_->ProcessMessagesUntilIdle();
    742 
    743       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    744       EXPECT_TRUE(NULL == server->Accept(&accept_address));
    745       EXPECT_EQ(accept_address, kEmptyAddr);
    746       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    747       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    748       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
    749     }
    750   }
    751 
    752   // Test cross-family datagram sending between a client bound to client_addr
    753   // and a server bound to server_addr. shouldSucceed indicates if sending is
    754   // expected to succed or not.
    755   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
    756                                const SocketAddress& server_addr,
    757                                bool shouldSucceed) {
    758     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
    759     socket->Bind(server_addr);
    760     SocketAddress bound_server_addr = socket->GetLocalAddress();
    761     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    762 
    763     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
    764     socket2->Bind(client_addr);
    765     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    766     SocketAddress client2_addr;
    767 
    768     if (shouldSucceed) {
    769       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
    770       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    771       SocketAddress client1_addr;
    772       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    773       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    774       EXPECT_EQ(client1_addr, bound_server_addr);
    775     } else {
    776       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
    777       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
    778     }
    779   }
    780 
    781  protected:
    782   virtual void SetUp() {
    783     Thread::Current()->set_socketserver(ss_);
    784   }
    785   virtual void TearDown() {
    786     Thread::Current()->set_socketserver(NULL);
    787   }
    788 
    789   VirtualSocketServer* ss_;
    790   const SocketAddress kIPv4AnyAddress;
    791   const SocketAddress kIPv6AnyAddress;
    792 };
    793 
    794 TEST_F(VirtualSocketServerTest, basic_v4) {
    795   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
    796   BasicTest(ipv4_test_addr);
    797 }
    798 
    799 TEST_F(VirtualSocketServerTest, basic_v6) {
    800   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
    801   BasicTest(ipv6_test_addr);
    802 }
    803 
    804 TEST_F(VirtualSocketServerTest, connect_v4) {
    805   ConnectTest(kIPv4AnyAddress);
    806 }
    807 
    808 TEST_F(VirtualSocketServerTest, connect_v6) {
    809   ConnectTest(kIPv6AnyAddress);
    810 }
    811 
    812 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
    813   ConnectToNonListenerTest(kIPv4AnyAddress);
    814 }
    815 
    816 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
    817   ConnectToNonListenerTest(kIPv6AnyAddress);
    818 }
    819 
    820 TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
    821   CloseDuringConnectTest(kIPv4AnyAddress);
    822 }
    823 
    824 TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
    825   CloseDuringConnectTest(kIPv6AnyAddress);
    826 }
    827 
    828 TEST_F(VirtualSocketServerTest, close_v4) {
    829   CloseTest(kIPv4AnyAddress);
    830 }
    831 
    832 TEST_F(VirtualSocketServerTest, close_v6) {
    833   CloseTest(kIPv6AnyAddress);
    834 }
    835 
    836 TEST_F(VirtualSocketServerTest, tcp_send_v4) {
    837   TcpSendTest(kIPv4AnyAddress);
    838 }
    839 
    840 TEST_F(VirtualSocketServerTest, tcp_send_v6) {
    841   TcpSendTest(kIPv6AnyAddress);
    842 }
    843 
    844 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
    845   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
    846 }
    847 
    848 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
    849   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
    850 }
    851 
    852 TEST_F(VirtualSocketServerTest, bandwidth_v4) {
    853   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    854   BandwidthTest(ipv4_test_addr);
    855 }
    856 
    857 TEST_F(VirtualSocketServerTest, bandwidth_v6) {
    858   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    859   BandwidthTest(ipv6_test_addr);
    860 }
    861 
    862 TEST_F(VirtualSocketServerTest, delay_v4) {
    863   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    864   DelayTest(ipv4_test_addr);
    865 }
    866 
    867 TEST_F(VirtualSocketServerTest, delay_v6) {
    868   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    869   DelayTest(ipv6_test_addr);
    870 }
    871 
    872 // Works, receiving socket sees 127.0.0.2.
    873 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
    874   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
    875                             SocketAddress("0.0.0.0", 5000),
    876                             true);
    877 }
    878 
    879 // Fails.
    880 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
    881   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    882                             SocketAddress("0.0.0.0", 5000),
    883                             false);
    884 }
    885 
    886 // Fails.
    887 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
    888   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    889                             SocketAddress("::ffff:127.0.0.1", 5000),
    890                             false);
    891 }
    892 
    893 // Works. receiving socket sees ::ffff:127.0.0.2.
    894 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
    895   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    896                             SocketAddress("::", 5000),
    897                             true);
    898 }
    899 
    900 // Fails.
    901 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
    902   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    903                             SocketAddress("::1", 5000),
    904                             false);
    905 }
    906 
    907 // Works. Receiving socket sees ::ffff:127.0.0.1.
    908 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
    909   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
    910                             SocketAddress("::ffff:127.0.0.2", 5000),
    911                             true);
    912 }
    913 
    914 // Works, receiving socket sees a result from GetNextIP.
    915 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
    916   CrossFamilyConnectionTest(SocketAddress("::", 0),
    917                             SocketAddress("0.0.0.0", 5000),
    918                             true);
    919 }
    920 
    921 // Works, receiving socket sees whatever GetNextIP gave the client.
    922 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
    923   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
    924                             SocketAddress("::", 5000),
    925                             true);
    926 }
    927 
    928 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
    929   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
    930                           SocketAddress("::", 5000),
    931                           true);
    932 }
    933 
    934 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
    935   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
    936                           SocketAddress("0.0.0.0", 5000),
    937                           true);
    938 }
    939 
    940 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
    941   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    942                           SocketAddress("0.0.0.0", 5000),
    943                           false);
    944 }
    945 
    946 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
    947   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    948                           SocketAddress("::ffff:127.0.0.1", 5000),
    949                           false);
    950 }
    951 
    952 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
    953   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    954                           SocketAddress("::", 5000),
    955                           true);
    956 }
    957 
    958 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
    959   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    960                           SocketAddress("::1", 5000),
    961                           false);
    962 }
    963 
    964 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
    965   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
    966                           SocketAddress("::ffff:127.0.0.2", 5000),
    967                           true);
    968 }
    969 
    970 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
    971   CrossFamilyDatagramTest(SocketAddress("::", 0),
    972                           SocketAddress("0.0.0.0", 5000),
    973                           true);
    974 }
    975 
    976 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
    977   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
    978   const double kTestDev[] = { 0.25, 0.1, 0.01 };
    979   // TODO: The current code only works for 1000 data points or more.
    980   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
    981   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
    982     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
    983       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
    984         ASSERT_LT(0u, kTestSamples[sidx]);
    985         const uint32 kStdDev =
    986             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
    987         VirtualSocketServer::Function* f =
    988             VirtualSocketServer::CreateDistribution(kTestMean[midx],
    989                                                     kStdDev,
    990                                                     kTestSamples[sidx]);
    991         ASSERT_TRUE(NULL != f);
    992         ASSERT_EQ(kTestSamples[sidx], f->size());
    993         double sum = 0;
    994         for (uint32 i = 0; i < f->size(); ++i) {
    995           sum += (*f)[i].second;
    996         }
    997         const double mean = sum / f->size();
    998         double sum_sq_dev = 0;
    999         for (uint32 i = 0; i < f->size(); ++i) {
   1000           double dev = (*f)[i].second - mean;
   1001           sum_sq_dev += dev * dev;
   1002         }
   1003         const double stddev = std::sqrt(sum_sq_dev / f->size());
   1004         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
   1005           << "M=" << kTestMean[midx]
   1006           << " SD=" << kStdDev
   1007           << " N=" << kTestSamples[sidx];
   1008         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
   1009           << "M=" << kTestMean[midx]
   1010           << " SD=" << kStdDev
   1011           << " N=" << kTestSamples[sidx];
   1012         delete f;
   1013       }
   1014     }
   1015   }
   1016 }
   1017