Home | History | Annotate | Download | only in test
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 // MSVC++ requires this to be set before any other includes to get M_PI.
      6 #define _USE_MATH_DEFINES
      7 
      8 #include "remoting/test/fake_socket_factory.h"
      9 
     10 #include <math.h>
     11 
     12 #include "base/bind.h"
     13 #include "base/callback.h"
     14 #include "base/location.h"
     15 #include "base/rand_util.h"
     16 #include "base/single_thread_task_runner.h"
     17 #include "base/thread_task_runner_handle.h"
     18 #include "net/base/io_buffer.h"
     19 #include "remoting/test/leaky_bucket.h"
     20 #include "third_party/webrtc/base/asyncpacketsocket.h"
     21 
     22 namespace remoting {
     23 
     24 namespace {
     25 
     26 const int kPortRangeStart = 1024;
     27 const int kPortRangeEnd = 65535;
     28 
     29 double GetNormalRandom(double average, double stddev) {
     30   // Based on Box-Muller transform, see
     31   // http://en.wikipedia.org/wiki/Box_Muller_transform .
     32   return average +
     33          stddev * sqrt(-2.0 * log(1.0 - base::RandDouble())) *
     34              cos(base::RandDouble() * 2.0 * M_PI);
     35 }
     36 
     37 class FakeUdpSocket : public rtc::AsyncPacketSocket {
     38  public:
     39   FakeUdpSocket(FakePacketSocketFactory* factory,
     40                 scoped_refptr<FakeNetworkDispatcher> dispatcher,
     41                 const rtc::SocketAddress& local_address);
     42   virtual ~FakeUdpSocket();
     43 
     44   void ReceivePacket(const rtc::SocketAddress& from,
     45                      const rtc::SocketAddress& to,
     46                      const scoped_refptr<net::IOBuffer>& data,
     47                      int data_size);
     48 
     49   // rtc::AsyncPacketSocket interface.
     50   virtual rtc::SocketAddress GetLocalAddress() const OVERRIDE;
     51   virtual rtc::SocketAddress GetRemoteAddress() const OVERRIDE;
     52   virtual int Send(const void* data, size_t data_size,
     53                    const rtc::PacketOptions& options) OVERRIDE;
     54   virtual int SendTo(const void* data, size_t data_size,
     55                      const rtc::SocketAddress& address,
     56                      const rtc::PacketOptions& options) OVERRIDE;
     57   virtual int Close() OVERRIDE;
     58   virtual State GetState() const OVERRIDE;
     59   virtual int GetOption(rtc::Socket::Option option, int* value) OVERRIDE;
     60   virtual int SetOption(rtc::Socket::Option option, int value) OVERRIDE;
     61   virtual int GetError() const OVERRIDE;
     62   virtual void SetError(int error) OVERRIDE;
     63 
     64  private:
     65   FakePacketSocketFactory* factory_;
     66   scoped_refptr<FakeNetworkDispatcher> dispatcher_;
     67   rtc::SocketAddress local_address_;
     68   State state_;
     69 
     70   DISALLOW_COPY_AND_ASSIGN(FakeUdpSocket);
     71 };
     72 
     73 FakeUdpSocket::FakeUdpSocket(FakePacketSocketFactory* factory,
     74                              scoped_refptr<FakeNetworkDispatcher> dispatcher,
     75                              const rtc::SocketAddress& local_address)
     76     : factory_(factory),
     77       dispatcher_(dispatcher),
     78       local_address_(local_address),
     79       state_(STATE_BOUND) {
     80 }
     81 
     82 FakeUdpSocket::~FakeUdpSocket() {
     83   factory_->OnSocketDestroyed(local_address_.port());
     84 }
     85 
     86 void FakeUdpSocket::ReceivePacket(const rtc::SocketAddress& from,
     87                                   const rtc::SocketAddress& to,
     88                                   const scoped_refptr<net::IOBuffer>& data,
     89                                   int data_size) {
     90   SignalReadPacket(
     91       this, data->data(), data_size, from, rtc::CreatePacketTime(0));
     92 }
     93 
     94 rtc::SocketAddress FakeUdpSocket::GetLocalAddress() const {
     95   return local_address_;
     96 }
     97 
     98 rtc::SocketAddress FakeUdpSocket::GetRemoteAddress() const {
     99   NOTREACHED();
    100   return rtc::SocketAddress();
    101 }
    102 
    103 int FakeUdpSocket::Send(const void* data, size_t data_size,
    104                         const rtc::PacketOptions& options) {
    105   NOTREACHED();
    106   return EINVAL;
    107 }
    108 
    109 int FakeUdpSocket::SendTo(const void* data, size_t data_size,
    110                           const rtc::SocketAddress& address,
    111                           const rtc::PacketOptions& options) {
    112   scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(data_size);
    113   memcpy(buffer->data(), data, data_size);
    114   dispatcher_->DeliverPacket(local_address_, address, buffer, data_size);
    115   return data_size;
    116 }
    117 
    118 int FakeUdpSocket::Close() {
    119   state_ = STATE_CLOSED;
    120   return 0;
    121 }
    122 
    123 rtc::AsyncPacketSocket::State FakeUdpSocket::GetState() const {
    124   return state_;
    125 }
    126 
    127 int FakeUdpSocket::GetOption(rtc::Socket::Option option, int* value) {
    128   NOTIMPLEMENTED();
    129   return -1;
    130 }
    131 
    132 int FakeUdpSocket::SetOption(rtc::Socket::Option option, int value) {
    133   NOTIMPLEMENTED();
    134   return -1;
    135 }
    136 
    137 int FakeUdpSocket::GetError() const {
    138   return 0;
    139 }
    140 
    141 void FakeUdpSocket::SetError(int error) {
    142   NOTREACHED();
    143 }
    144 
    145 }  // namespace
    146 
    147 FakePacketSocketFactory::PendingPacket::PendingPacket()
    148     : data_size(0) {
    149 }
    150 
    151 FakePacketSocketFactory::PendingPacket::PendingPacket(
    152     const rtc::SocketAddress& from,
    153     const rtc::SocketAddress& to,
    154     const scoped_refptr<net::IOBuffer>& data,
    155     int data_size)
    156     : from(from), to(to), data(data), data_size(data_size) {
    157 }
    158 
    159 FakePacketSocketFactory::PendingPacket::~PendingPacket() {
    160 }
    161 
    162 FakePacketSocketFactory::FakePacketSocketFactory(
    163     FakeNetworkDispatcher* dispatcher)
    164     : task_runner_(base::ThreadTaskRunnerHandle::Get()),
    165       dispatcher_(dispatcher),
    166       address_(dispatcher_->AllocateAddress()),
    167       out_of_order_rate_(0.0),
    168       next_port_(kPortRangeStart),
    169       weak_factory_(this) {
    170   dispatcher_->AddNode(this);
    171 }
    172 
    173 FakePacketSocketFactory::~FakePacketSocketFactory() {
    174   CHECK(udp_sockets_.empty());
    175   dispatcher_->RemoveNode(this);
    176 }
    177 
    178 void FakePacketSocketFactory::OnSocketDestroyed(int port) {
    179   DCHECK(task_runner_->BelongsToCurrentThread());
    180   udp_sockets_.erase(port);
    181 }
    182 
    183 void FakePacketSocketFactory::SetBandwidth(int bandwidth, int max_buffer) {
    184   DCHECK(task_runner_->BelongsToCurrentThread());
    185   if (bandwidth <= 0) {
    186     leaky_bucket_.reset();
    187   } else {
    188     leaky_bucket_.reset(new LeakyBucket(max_buffer, bandwidth));
    189   }
    190 }
    191 
    192 void FakePacketSocketFactory::SetLatency(base::TimeDelta average,
    193                                          base::TimeDelta stddev) {
    194   DCHECK(task_runner_->BelongsToCurrentThread());
    195   latency_average_ = average;
    196   latency_stddev_ = stddev;
    197 }
    198 
    199 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateUdpSocket(
    200     const rtc::SocketAddress& local_address,
    201     int min_port, int max_port) {
    202   DCHECK(task_runner_->BelongsToCurrentThread());
    203 
    204   int port = -1;
    205   if (min_port > 0 && max_port > 0) {
    206     for (int i = min_port; i <= max_port; ++i) {
    207       if (udp_sockets_.find(i) == udp_sockets_.end()) {
    208         port = i;
    209         break;
    210       }
    211     }
    212     if (port < 0)
    213       return NULL;
    214   } else {
    215     do {
    216       port = next_port_;
    217       next_port_ =
    218           (next_port_ >= kPortRangeEnd) ? kPortRangeStart : (next_port_ + 1);
    219     } while (udp_sockets_.find(port) != udp_sockets_.end());
    220   }
    221 
    222   CHECK(local_address.ipaddr() == address_);
    223 
    224   FakeUdpSocket* result =
    225       new FakeUdpSocket(this, dispatcher_,
    226                         rtc::SocketAddress(local_address.ipaddr(), port));
    227 
    228   udp_sockets_[port] =
    229       base::Bind(&FakeUdpSocket::ReceivePacket, base::Unretained(result));
    230 
    231   return result;
    232 }
    233 
    234 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateServerTcpSocket(
    235     const rtc::SocketAddress& local_address,
    236     int min_port, int max_port,
    237     int opts) {
    238   return NULL;
    239 }
    240 
    241 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateClientTcpSocket(
    242     const rtc::SocketAddress& local_address,
    243     const rtc::SocketAddress& remote_address,
    244     const rtc::ProxyInfo& proxy_info,
    245     const std::string& user_agent,
    246     int opts) {
    247   return NULL;
    248 }
    249 
    250 rtc::AsyncResolverInterface*
    251 FakePacketSocketFactory::CreateAsyncResolver() {
    252   return NULL;
    253 }
    254 
    255 const scoped_refptr<base::SingleThreadTaskRunner>&
    256 FakePacketSocketFactory::GetThread() const {
    257   return task_runner_;
    258 }
    259 
    260 const rtc::IPAddress& FakePacketSocketFactory::GetAddress() const {
    261   return address_;
    262 }
    263 
    264 void FakePacketSocketFactory::ReceivePacket(
    265     const rtc::SocketAddress& from,
    266     const rtc::SocketAddress& to,
    267     const scoped_refptr<net::IOBuffer>& data,
    268     int data_size) {
    269   DCHECK(task_runner_->BelongsToCurrentThread());
    270   DCHECK(to.ipaddr() == address_);
    271 
    272   base::TimeDelta delay;
    273 
    274   if (leaky_bucket_) {
    275     delay = leaky_bucket_->AddPacket(data_size);
    276     if (delay.is_max()) {
    277       // Drop the packet.
    278       return;
    279     }
    280   }
    281 
    282   if (latency_average_ > base::TimeDelta()) {
    283     delay += base::TimeDelta::FromMillisecondsD(
    284         GetNormalRandom(latency_average_.InMillisecondsF(),
    285                         latency_stddev_.InMillisecondsF()));
    286   }
    287   if (delay < base::TimeDelta())
    288     delay = base::TimeDelta();
    289 
    290   // Put the packet to the |pending_packets_| and post a task for
    291   // DoReceivePackets(). Note that the DoReceivePackets() task posted here may
    292   // deliver a different packet, not the one added to the queue here. This
    293   // would happen if another task gets posted with a shorted delay or when
    294   // |out_of_order_rate_| is greater than 0. It's implemented this way to
    295   // decouple latency variability from out-of-order delivery.
    296   PendingPacket packet(from, to, data, data_size);
    297   pending_packets_.push_back(packet);
    298   task_runner_->PostDelayedTask(
    299       FROM_HERE,
    300       base::Bind(&FakePacketSocketFactory::DoReceivePacket,
    301                  weak_factory_.GetWeakPtr()),
    302       delay);
    303 }
    304 
    305 void FakePacketSocketFactory::DoReceivePacket() {
    306   DCHECK(task_runner_->BelongsToCurrentThread());
    307 
    308   PendingPacket packet;
    309   if (pending_packets_.size() > 1 && base::RandDouble() < out_of_order_rate_) {
    310     std::list<PendingPacket>::iterator it = pending_packets_.begin();
    311     ++it;
    312     packet = *it;
    313     pending_packets_.erase(it);
    314   } else {
    315     packet = pending_packets_.front();
    316     pending_packets_.pop_front();
    317   }
    318 
    319   UdpSocketsMap::iterator iter = udp_sockets_.find(packet.to.port());
    320   if (iter == udp_sockets_.end()) {
    321     // Invalid port number.
    322     return;
    323   }
    324 
    325   iter->second.Run(packet.from, packet.to, packet.data, packet.data_size);
    326 }
    327 
    328 }  // namespace remoting
    329