Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2006 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include <math.h>
     12 #include <time.h>
     13 #if defined(WEBRTC_POSIX)
     14 #include <netinet/in.h>
     15 #endif
     16 
     17 #include "webrtc/base/logging.h"
     18 #include "webrtc/base/gunit.h"
     19 #include "webrtc/base/testclient.h"
     20 #include "webrtc/base/testutils.h"
     21 #include "webrtc/base/thread.h"
     22 #include "webrtc/base/timeutils.h"
     23 #include "webrtc/base/virtualsocketserver.h"
     24 
     25 using namespace rtc;
     26 
     27 // Sends at a constant rate but with random packet sizes.
     28 struct Sender : public MessageHandler {
     29   Sender(Thread* th, AsyncSocket* s, uint32 rt)
     30       : thread(th), socket(new AsyncUDPSocket(s)),
     31         done(false), rate(rt), count(0) {
     32     last_send = rtc::Time();
     33     thread->PostDelayed(NextDelay(), this, 1);
     34   }
     35 
     36   uint32 NextDelay() {
     37     uint32 size = (rand() % 4096) + 1;
     38     return 1000 * size / rate;
     39   }
     40 
     41   void OnMessage(Message* pmsg) {
     42     ASSERT_EQ(1u, pmsg->message_id);
     43 
     44     if (done)
     45       return;
     46 
     47     uint32 cur_time = rtc::Time();
     48     uint32 delay = cur_time - last_send;
     49     uint32 size = rate * delay / 1000;
     50     size = std::min<uint32>(size, 4096);
     51     size = std::max<uint32>(size, sizeof(uint32));
     52 
     53     count += size;
     54     memcpy(dummy, &cur_time, sizeof(cur_time));
     55     socket->Send(dummy, size, options);
     56 
     57     last_send = cur_time;
     58     thread->PostDelayed(NextDelay(), this, 1);
     59   }
     60 
     61   Thread* thread;
     62   scoped_ptr<AsyncUDPSocket> socket;
     63   rtc::PacketOptions options;
     64   bool done;
     65   uint32 rate;  // bytes per second
     66   uint32 count;
     67   uint32 last_send;
     68   char dummy[4096];
     69 };
     70 
     71 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
     72   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
     73       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
     74         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
     75     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
     76     thread->PostDelayed(1000, this, 1);
     77   }
     78 
     79   ~Receiver() {
     80     thread->Clear(this);
     81   }
     82 
     83   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
     84                     const SocketAddress& remote_addr,
     85                     const PacketTime& packet_time) {
     86     ASSERT_EQ(socket.get(), s);
     87     ASSERT_GE(size, 4U);
     88 
     89     count += size;
     90     sec_count += size;
     91 
     92     uint32 send_time = *reinterpret_cast<const uint32*>(data);
     93     uint32 recv_time = rtc::Time();
     94     uint32 delay = recv_time - send_time;
     95     sum += delay;
     96     sum_sq += delay * delay;
     97     samples += 1;
     98   }
     99 
    100   void OnMessage(Message* pmsg) {
    101     ASSERT_EQ(1u, pmsg->message_id);
    102 
    103     if (done)
    104       return;
    105 
    106     // It is always possible for us to receive more than expected because
    107     // packets can be further delayed in delivery.
    108     if (bandwidth > 0)
    109       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
    110     sec_count = 0;
    111     thread->PostDelayed(1000, this, 1);
    112   }
    113 
    114   Thread* thread;
    115   scoped_ptr<AsyncUDPSocket> socket;
    116   uint32 bandwidth;
    117   bool done;
    118   size_t count;
    119   size_t sec_count;
    120   double sum;
    121   double sum_sq;
    122   uint32 samples;
    123 };
    124 
    125 class VirtualSocketServerTest : public testing::Test {
    126  public:
    127   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
    128                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
    129                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
    130   }
    131 
    132   void CheckAddressIncrementalization(const SocketAddress& post,
    133                                       const SocketAddress& pre) {
    134     EXPECT_EQ(post.port(), pre.port() + 1);
    135     IPAddress post_ip = post.ipaddr();
    136     IPAddress pre_ip = pre.ipaddr();
    137     EXPECT_EQ(pre_ip.family(), post_ip.family());
    138     if (post_ip.family() == AF_INET) {
    139       in_addr pre_ipv4 = pre_ip.ipv4_address();
    140       in_addr post_ipv4 = post_ip.ipv4_address();
    141       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
    142       EXPECT_EQ(1, difference);
    143     } else if (post_ip.family() == AF_INET6) {
    144       in6_addr post_ip6 = post_ip.ipv6_address();
    145       in6_addr pre_ip6 = pre_ip.ipv6_address();
    146       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
    147       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
    148       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
    149     }
    150   }
    151 
    152   void BasicTest(const SocketAddress& initial_addr) {
    153     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
    154                                                  SOCK_DGRAM);
    155     socket->Bind(initial_addr);
    156     SocketAddress server_addr = socket->GetLocalAddress();
    157     // Make sure VSS didn't switch families on us.
    158     EXPECT_EQ(server_addr.family(), initial_addr.family());
    159 
    160     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    161     AsyncSocket* socket2 =
    162         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    163     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    164 
    165     SocketAddress client2_addr;
    166     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    167     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    168 
    169     SocketAddress client1_addr;
    170     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    171     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    172     EXPECT_EQ(client1_addr, server_addr);
    173 
    174     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
    175     for (int i = 0; i < 10; i++) {
    176       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
    177 
    178       SocketAddress next_client2_addr;
    179       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
    180       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
    181       CheckAddressIncrementalization(next_client2_addr, client2_addr);
    182       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
    183 
    184       SocketAddress server_addr2;
    185       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
    186       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
    187       EXPECT_EQ(server_addr2, server_addr);
    188 
    189       client2_addr = next_client2_addr;
    190     }
    191   }
    192 
    193   // initial_addr should be made from either INADDR_ANY or in6addr_any.
    194   void ConnectTest(const SocketAddress& initial_addr) {
    195     testing::StreamSink sink;
    196     SocketAddress accept_addr;
    197     const SocketAddress kEmptyAddr =
    198         EmptySocketAddressWithFamily(initial_addr.family());
    199 
    200     // Create client
    201     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    202                                                  SOCK_STREAM);
    203     sink.Monitor(client);
    204     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    205     EXPECT_TRUE(client->GetLocalAddress().IsNil());
    206 
    207     // Create server
    208     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    209                                                  SOCK_STREAM);
    210     sink.Monitor(server);
    211     EXPECT_NE(0, server->Listen(5));  // Bind required
    212     EXPECT_EQ(0, server->Bind(initial_addr));
    213     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    214     EXPECT_EQ(0, server->Listen(5));
    215     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
    216 
    217     // No pending server connections
    218     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    219     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    220     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
    221 
    222     // Attempt connect to listening socket
    223     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    224     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
    225     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
    226     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
    227 
    228     // Client is connecting
    229     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    230     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    231     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    232 
    233     ss_->ProcessMessagesUntilIdle();
    234 
    235     // Client still connecting
    236     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    237     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    238     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    239 
    240     // Server has pending connection
    241     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    242     Socket* accepted = server->Accept(&accept_addr);
    243     EXPECT_TRUE(NULL != accepted);
    244     EXPECT_NE(accept_addr, kEmptyAddr);
    245     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
    246 
    247     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    248     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
    249     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
    250 
    251     ss_->ProcessMessagesUntilIdle();
    252 
    253     // Client has connected
    254     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
    255     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    256     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
    257     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    258     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
    259   }
    260 
    261   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
    262     testing::StreamSink sink;
    263     SocketAddress accept_addr;
    264     const SocketAddress nil_addr;
    265     const SocketAddress empty_addr =
    266         EmptySocketAddressWithFamily(initial_addr.family());
    267 
    268     // Create client
    269     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
    270                                                  SOCK_STREAM);
    271     sink.Monitor(client);
    272 
    273     // Create server
    274     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
    275                                                  SOCK_STREAM);
    276     sink.Monitor(server);
    277     EXPECT_EQ(0, server->Bind(initial_addr));
    278     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    279     // Attempt connect to non-listening socket
    280     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    281 
    282     ss_->ProcessMessagesUntilIdle();
    283 
    284     // No pending server connections
    285     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    286     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
    287     EXPECT_EQ(accept_addr, nil_addr);
    288 
    289     // Connection failed
    290     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    291     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    292     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
    293     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
    294   }
    295 
    296   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
    297     testing::StreamSink sink;
    298     SocketAddress accept_addr;
    299     const SocketAddress empty_addr =
    300         EmptySocketAddressWithFamily(initial_addr.family());
    301 
    302     // Create client and server
    303     scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
    304                                                           SOCK_STREAM));
    305     sink.Monitor(client.get());
    306     scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
    307                                                           SOCK_STREAM));
    308     sink.Monitor(server.get());
    309 
    310     // Initiate connect
    311     EXPECT_EQ(0, server->Bind(initial_addr));
    312     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    313 
    314     EXPECT_EQ(0, server->Listen(5));
    315     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    316 
    317     // Server close before socket enters accept queue
    318     EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
    319     server->Close();
    320 
    321     ss_->ProcessMessagesUntilIdle();
    322 
    323     // Result: connection failed
    324     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    325     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
    326 
    327     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
    328     sink.Monitor(server.get());
    329 
    330     // Initiate connect
    331     EXPECT_EQ(0, server->Bind(initial_addr));
    332     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    333 
    334     EXPECT_EQ(0, server->Listen(5));
    335     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    336 
    337     ss_->ProcessMessagesUntilIdle();
    338 
    339     // Server close while socket is in accept queue
    340     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
    341     server->Close();
    342 
    343     ss_->ProcessMessagesUntilIdle();
    344 
    345     // Result: connection failed
    346     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    347     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
    348 
    349     // New server
    350     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
    351     sink.Monitor(server.get());
    352 
    353     // Initiate connect
    354     EXPECT_EQ(0, server->Bind(initial_addr));
    355     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
    356 
    357     EXPECT_EQ(0, server->Listen(5));
    358     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    359 
    360     ss_->ProcessMessagesUntilIdle();
    361 
    362     // Server accepts connection
    363     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
    364     scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
    365     ASSERT_TRUE(NULL != accepted.get());
    366     sink.Monitor(accepted.get());
    367 
    368     // Client closes before connection complets
    369     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
    370 
    371     // Connected message has not been processed yet.
    372     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
    373     client->Close();
    374 
    375     ss_->ProcessMessagesUntilIdle();
    376 
    377     // Result: accepted socket closes
    378     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
    379     EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
    380     EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
    381   }
    382 
    383   void CloseTest(const SocketAddress& initial_addr) {
    384     testing::StreamSink sink;
    385     const SocketAddress kEmptyAddr;
    386 
    387     // Create clients
    388     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    389     sink.Monitor(a);
    390     a->Bind(initial_addr);
    391     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    392 
    393 
    394     scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
    395                                                      SOCK_STREAM));
    396     sink.Monitor(b.get());
    397     b->Bind(initial_addr);
    398     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    399 
    400     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    401     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    402 
    403     ss_->ProcessMessagesUntilIdle();
    404 
    405     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
    406     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
    407     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
    408 
    409     EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
    410     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
    411     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
    412 
    413     EXPECT_EQ(1, a->Send("a", 1));
    414     b->Close();
    415     EXPECT_EQ(1, a->Send("b", 1));
    416 
    417     ss_->ProcessMessagesUntilIdle();
    418 
    419     char buffer[10];
    420     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
    421     EXPECT_EQ(-1, b->Recv(buffer, 10));
    422 
    423     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
    424     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
    425     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
    426 
    427     // No signal for Closer
    428     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
    429     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
    430     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
    431   }
    432 
    433   void TcpSendTest(const SocketAddress& initial_addr) {
    434     testing::StreamSink sink;
    435     const SocketAddress kEmptyAddr;
    436 
    437     // Connect two sockets
    438     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    439     sink.Monitor(a);
    440     a->Bind(initial_addr);
    441     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    442 
    443     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
    444     sink.Monitor(b);
    445     b->Bind(initial_addr);
    446     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    447 
    448     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    449     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    450 
    451     ss_->ProcessMessagesUntilIdle();
    452 
    453     const size_t kBufferSize = 2000;
    454     ss_->set_send_buffer_capacity(kBufferSize);
    455     ss_->set_recv_buffer_capacity(kBufferSize);
    456 
    457     const size_t kDataSize = 5000;
    458     char send_buffer[kDataSize], recv_buffer[kDataSize];
    459     for (size_t i = 0; i < kDataSize; ++i)
    460       send_buffer[i] = static_cast<char>(i % 256);
    461     memset(recv_buffer, 0, sizeof(recv_buffer));
    462     size_t send_pos = 0, recv_pos = 0;
    463 
    464     // Can't send more than send buffer in one write
    465     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    466     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    467     send_pos += result;
    468 
    469     ss_->ProcessMessagesUntilIdle();
    470     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    471     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    472 
    473     // Receive buffer is already filled, fill send buffer again
    474     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    475     EXPECT_EQ(static_cast<int>(kBufferSize), result);
    476     send_pos += result;
    477 
    478     ss_->ProcessMessagesUntilIdle();
    479     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
    480     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    481 
    482     // No more room in send or receive buffer
    483     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    484     EXPECT_EQ(-1, result);
    485     EXPECT_TRUE(a->IsBlocking());
    486 
    487     // Read a subset of the data
    488     result = b->Recv(recv_buffer + recv_pos, 500);
    489     EXPECT_EQ(500, result);
    490     recv_pos += result;
    491 
    492     ss_->ProcessMessagesUntilIdle();
    493     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
    494     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    495 
    496     // Room for more on the sending side
    497     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    498     EXPECT_EQ(500, result);
    499     send_pos += result;
    500 
    501     // Empty the recv buffer
    502     while (true) {
    503       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    504       if (result < 0) {
    505         EXPECT_EQ(-1, result);
    506         EXPECT_TRUE(b->IsBlocking());
    507         break;
    508       }
    509       recv_pos += result;
    510     }
    511 
    512     ss_->ProcessMessagesUntilIdle();
    513     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    514 
    515     // Continue to empty the recv buffer
    516     while (true) {
    517       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    518       if (result < 0) {
    519         EXPECT_EQ(-1, result);
    520         EXPECT_TRUE(b->IsBlocking());
    521         break;
    522       }
    523       recv_pos += result;
    524     }
    525 
    526     // Send last of the data
    527     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
    528     EXPECT_EQ(500, result);
    529     send_pos += result;
    530 
    531     ss_->ProcessMessagesUntilIdle();
    532     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
    533 
    534     // Receive the last of the data
    535     while (true) {
    536       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
    537       if (result < 0) {
    538         EXPECT_EQ(-1, result);
    539         EXPECT_TRUE(b->IsBlocking());
    540         break;
    541       }
    542       recv_pos += result;
    543     }
    544 
    545     ss_->ProcessMessagesUntilIdle();
    546     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
    547 
    548     // The received data matches the sent data
    549     EXPECT_EQ(kDataSize, send_pos);
    550     EXPECT_EQ(kDataSize, recv_pos);
    551     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
    552   }
    553 
    554   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
    555     const SocketAddress kEmptyAddr;
    556 
    557     // Connect two sockets
    558     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
    559                                             SOCK_STREAM);
    560     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
    561                                             SOCK_STREAM);
    562     a->Bind(initial_addr);
    563     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
    564 
    565     b->Bind(initial_addr);
    566     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
    567 
    568     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
    569     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
    570     ss_->ProcessMessagesUntilIdle();
    571 
    572     // First, deliver all packets in 0 ms.
    573     char buffer[2] = { 0, 0 };
    574     const char cNumPackets = 10;
    575     for (char i = 0; i < cNumPackets; ++i) {
    576       buffer[0] = '0' + i;
    577       EXPECT_EQ(1, a->Send(buffer, 1));
    578     }
    579 
    580     ss_->ProcessMessagesUntilIdle();
    581 
    582     for (char i = 0; i < cNumPackets; ++i) {
    583       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    584       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
    585     }
    586 
    587     // Next, deliver packets at random intervals
    588     const uint32 mean = 50;
    589     const uint32 stddev = 50;
    590 
    591     ss_->set_delay_mean(mean);
    592     ss_->set_delay_stddev(stddev);
    593     ss_->UpdateDelayDistribution();
    594 
    595     for (char i = 0; i < cNumPackets; ++i) {
    596       buffer[0] = 'A' + i;
    597       EXPECT_EQ(1, a->Send(buffer, 1));
    598     }
    599 
    600     ss_->ProcessMessagesUntilIdle();
    601 
    602     for (char i = 0; i < cNumPackets; ++i) {
    603       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
    604       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
    605     }
    606   }
    607 
    608   void BandwidthTest(const SocketAddress& initial_addr) {
    609     AsyncSocket* send_socket =
    610         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    611     AsyncSocket* recv_socket =
    612         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    613     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    614     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    615     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    616     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    617     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    618 
    619     uint32 bandwidth = 64 * 1024;
    620     ss_->set_bandwidth(bandwidth);
    621 
    622     Thread* pthMain = Thread::Current();
    623     Sender sender(pthMain, send_socket, 80 * 1024);
    624     Receiver receiver(pthMain, recv_socket, bandwidth);
    625 
    626     pthMain->ProcessMessages(5000);
    627     sender.done = true;
    628     pthMain->ProcessMessages(5000);
    629 
    630     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
    631     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
    632 
    633     ss_->set_bandwidth(0);
    634   }
    635 
    636   void DelayTest(const SocketAddress& initial_addr) {
    637     time_t seed = ::time(NULL);
    638     LOG(LS_VERBOSE) << "seed = " << seed;
    639     srand(static_cast<unsigned int>(seed));
    640 
    641     const uint32 mean = 2000;
    642     const uint32 stddev = 500;
    643 
    644     ss_->set_delay_mean(mean);
    645     ss_->set_delay_stddev(stddev);
    646     ss_->UpdateDelayDistribution();
    647 
    648     AsyncSocket* send_socket =
    649         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    650     AsyncSocket* recv_socket =
    651         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
    652     ASSERT_EQ(0, send_socket->Bind(initial_addr));
    653     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
    654     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
    655     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
    656     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
    657 
    658     Thread* pthMain = Thread::Current();
    659     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
    660     // 1000 packets, which is necessary to get a good distribution.
    661     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
    662     Receiver receiver(pthMain, recv_socket, 0);
    663 
    664     pthMain->ProcessMessages(10000);
    665     sender.done = receiver.done = true;
    666     ss_->ProcessMessagesUntilIdle();
    667 
    668     const double sample_mean = receiver.sum / receiver.samples;
    669     double num =
    670         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
    671     double den = receiver.samples * (receiver.samples - 1);
    672     const double sample_stddev = sqrt(num / den);
    673     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
    674 
    675     EXPECT_LE(500u, receiver.samples);
    676     // We initially used a 0.1 fudge factor, but on the build machine, we
    677     // have seen the value differ by as much as 0.13.
    678     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
    679     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
    680 
    681     ss_->set_delay_mean(0);
    682     ss_->set_delay_stddev(0);
    683     ss_->UpdateDelayDistribution();
    684   }
    685 
    686   // Test cross-family communication between a client bound to client_addr and a
    687   // server bound to server_addr. shouldSucceed indicates if communication is
    688   // expected to work or not.
    689   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
    690                                  const SocketAddress& server_addr,
    691                                  bool shouldSucceed) {
    692     testing::StreamSink sink;
    693     SocketAddress accept_address;
    694     const SocketAddress kEmptyAddr;
    695 
    696     // Client gets a IPv4 address
    697     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
    698                                                  SOCK_STREAM);
    699     sink.Monitor(client);
    700     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    701     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
    702     client->Bind(client_addr);
    703 
    704     // Server gets a non-mapped non-any IPv6 address.
    705     // IPv4 sockets should not be able to connect to this.
    706     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
    707                                                  SOCK_STREAM);
    708     sink.Monitor(server);
    709     server->Bind(server_addr);
    710     server->Listen(5);
    711 
    712     if (shouldSucceed) {
    713       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
    714       ss_->ProcessMessagesUntilIdle();
    715       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
    716       Socket* accepted = server->Accept(&accept_address);
    717       EXPECT_TRUE(NULL != accepted);
    718       EXPECT_NE(kEmptyAddr, accept_address);
    719       ss_->ProcessMessagesUntilIdle();
    720       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
    721       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
    722     } else {
    723       // Check that the connection failed.
    724       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
    725       ss_->ProcessMessagesUntilIdle();
    726 
    727       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
    728       EXPECT_TRUE(NULL == server->Accept(&accept_address));
    729       EXPECT_EQ(accept_address, kEmptyAddr);
    730       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
    731       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
    732       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
    733     }
    734   }
    735 
    736   // Test cross-family datagram sending between a client bound to client_addr
    737   // and a server bound to server_addr. shouldSucceed indicates if sending is
    738   // expected to succed or not.
    739   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
    740                                const SocketAddress& server_addr,
    741                                bool shouldSucceed) {
    742     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
    743     socket->Bind(server_addr);
    744     SocketAddress bound_server_addr = socket->GetLocalAddress();
    745     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
    746 
    747     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
    748     socket2->Bind(client_addr);
    749     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
    750     SocketAddress client2_addr;
    751 
    752     if (shouldSucceed) {
    753       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
    754       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
    755       SocketAddress client1_addr;
    756       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
    757       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
    758       EXPECT_EQ(client1_addr, bound_server_addr);
    759     } else {
    760       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
    761       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
    762     }
    763   }
    764 
    765  protected:
    766   virtual void SetUp() {
    767     Thread::Current()->set_socketserver(ss_);
    768   }
    769   virtual void TearDown() {
    770     Thread::Current()->set_socketserver(NULL);
    771   }
    772 
    773   VirtualSocketServer* ss_;
    774   const SocketAddress kIPv4AnyAddress;
    775   const SocketAddress kIPv6AnyAddress;
    776 };
    777 
    778 TEST_F(VirtualSocketServerTest, basic_v4) {
    779   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
    780   BasicTest(ipv4_test_addr);
    781 }
    782 
    783 TEST_F(VirtualSocketServerTest, basic_v6) {
    784   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
    785   BasicTest(ipv6_test_addr);
    786 }
    787 
    788 TEST_F(VirtualSocketServerTest, connect_v4) {
    789   ConnectTest(kIPv4AnyAddress);
    790 }
    791 
    792 TEST_F(VirtualSocketServerTest, connect_v6) {
    793   ConnectTest(kIPv6AnyAddress);
    794 }
    795 
    796 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
    797   ConnectToNonListenerTest(kIPv4AnyAddress);
    798 }
    799 
    800 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
    801   ConnectToNonListenerTest(kIPv6AnyAddress);
    802 }
    803 
    804 TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
    805   CloseDuringConnectTest(kIPv4AnyAddress);
    806 }
    807 
    808 TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
    809   CloseDuringConnectTest(kIPv6AnyAddress);
    810 }
    811 
    812 TEST_F(VirtualSocketServerTest, close_v4) {
    813   CloseTest(kIPv4AnyAddress);
    814 }
    815 
    816 TEST_F(VirtualSocketServerTest, close_v6) {
    817   CloseTest(kIPv6AnyAddress);
    818 }
    819 
    820 TEST_F(VirtualSocketServerTest, tcp_send_v4) {
    821   TcpSendTest(kIPv4AnyAddress);
    822 }
    823 
    824 TEST_F(VirtualSocketServerTest, tcp_send_v6) {
    825   TcpSendTest(kIPv6AnyAddress);
    826 }
    827 
    828 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
    829   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
    830 }
    831 
    832 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
    833   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
    834 }
    835 
    836 TEST_F(VirtualSocketServerTest, bandwidth_v4) {
    837   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    838   BandwidthTest(ipv4_test_addr);
    839 }
    840 
    841 TEST_F(VirtualSocketServerTest, bandwidth_v6) {
    842   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    843   BandwidthTest(ipv6_test_addr);
    844 }
    845 
    846 TEST_F(VirtualSocketServerTest, delay_v4) {
    847   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
    848   DelayTest(ipv4_test_addr);
    849 }
    850 
    851 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
    852 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
    853   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
    854   DelayTest(ipv6_test_addr);
    855 }
    856 
    857 // Works, receiving socket sees 127.0.0.2.
    858 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
    859   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
    860                             SocketAddress("0.0.0.0", 5000),
    861                             true);
    862 }
    863 
    864 // Fails.
    865 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
    866   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    867                             SocketAddress("0.0.0.0", 5000),
    868                             false);
    869 }
    870 
    871 // Fails.
    872 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
    873   CrossFamilyConnectionTest(SocketAddress("::2", 0),
    874                             SocketAddress("::ffff:127.0.0.1", 5000),
    875                             false);
    876 }
    877 
    878 // Works. receiving socket sees ::ffff:127.0.0.2.
    879 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
    880   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    881                             SocketAddress("::", 5000),
    882                             true);
    883 }
    884 
    885 // Fails.
    886 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
    887   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
    888                             SocketAddress("::1", 5000),
    889                             false);
    890 }
    891 
    892 // Works. Receiving socket sees ::ffff:127.0.0.1.
    893 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
    894   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
    895                             SocketAddress("::ffff:127.0.0.2", 5000),
    896                             true);
    897 }
    898 
    899 // Works, receiving socket sees a result from GetNextIP.
    900 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
    901   CrossFamilyConnectionTest(SocketAddress("::", 0),
    902                             SocketAddress("0.0.0.0", 5000),
    903                             true);
    904 }
    905 
    906 // Works, receiving socket sees whatever GetNextIP gave the client.
    907 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
    908   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
    909                             SocketAddress("::", 5000),
    910                             true);
    911 }
    912 
    913 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
    914   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
    915                           SocketAddress("::", 5000),
    916                           true);
    917 }
    918 
    919 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
    920   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
    921                           SocketAddress("0.0.0.0", 5000),
    922                           true);
    923 }
    924 
    925 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
    926   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    927                           SocketAddress("0.0.0.0", 5000),
    928                           false);
    929 }
    930 
    931 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
    932   CrossFamilyDatagramTest(SocketAddress("::2", 0),
    933                           SocketAddress("::ffff:127.0.0.1", 5000),
    934                           false);
    935 }
    936 
    937 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
    938   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    939                           SocketAddress("::", 5000),
    940                           true);
    941 }
    942 
    943 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
    944   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
    945                           SocketAddress("::1", 5000),
    946                           false);
    947 }
    948 
    949 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
    950   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
    951                           SocketAddress("::ffff:127.0.0.2", 5000),
    952                           true);
    953 }
    954 
    955 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
    956   CrossFamilyDatagramTest(SocketAddress("::", 0),
    957                           SocketAddress("0.0.0.0", 5000),
    958                           true);
    959 }
    960 
    961 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
    962   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
    963   const double kTestDev[] = { 0.25, 0.1, 0.01 };
    964   // TODO: The current code only works for 1000 data points or more.
    965   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
    966   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
    967     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
    968       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
    969         ASSERT_LT(0u, kTestSamples[sidx]);
    970         const uint32 kStdDev =
    971             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
    972         VirtualSocketServer::Function* f =
    973             VirtualSocketServer::CreateDistribution(kTestMean[midx],
    974                                                     kStdDev,
    975                                                     kTestSamples[sidx]);
    976         ASSERT_TRUE(NULL != f);
    977         ASSERT_EQ(kTestSamples[sidx], f->size());
    978         double sum = 0;
    979         for (uint32 i = 0; i < f->size(); ++i) {
    980           sum += (*f)[i].second;
    981         }
    982         const double mean = sum / f->size();
    983         double sum_sq_dev = 0;
    984         for (uint32 i = 0; i < f->size(); ++i) {
    985           double dev = (*f)[i].second - mean;
    986           sum_sq_dev += dev * dev;
    987         }
    988         const double stddev = sqrt(sum_sq_dev / f->size());
    989         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
    990           << "M=" << kTestMean[midx]
    991           << " SD=" << kStdDev
    992           << " N=" << kTestSamples[sidx];
    993         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
    994           << "M=" << kTestMean[midx]
    995           << " SD=" << kStdDev
    996           << " N=" << kTestSamples[sidx];
    997         delete f;
    998       }
    999     }
   1000   }
   1001 }
   1002