Home | History | Annotate | Download | only in utility
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <math.h>
      6 #include <stdlib.h>
      7 #include <vector>
      8 
      9 #include "media/cast/test/utility/udp_proxy.h"
     10 
     11 #include "base/logging.h"
     12 #include "base/rand_util.h"
     13 #include "base/synchronization/waitable_event.h"
     14 #include "base/threading/thread.h"
     15 #include "base/time/default_tick_clock.h"
     16 #include "net/base/io_buffer.h"
     17 #include "net/base/net_errors.h"
     18 #include "net/udp/udp_socket.h"
     19 
     20 namespace media {
     21 namespace cast {
     22 namespace test {
     23 
     24 const size_t kMaxPacketSize = 65536;
     25 
     26 PacketPipe::PacketPipe() {}
     27 PacketPipe::~PacketPipe() {}
     28 void PacketPipe::InitOnIOThread(
     29     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
     30     base::TickClock* clock) {
     31   task_runner_ = task_runner;
     32   clock_ = clock;
     33   if (pipe_) {
     34     pipe_->InitOnIOThread(task_runner, clock);
     35   }
     36 }
     37 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
     38   if (pipe_) {
     39     pipe_->AppendToPipe(pipe.Pass());
     40   } else {
     41     pipe_ = pipe.Pass();
     42   }
     43 }
     44 
     45 // Roughly emulates a buffer inside a device.
     46 // If the buffer is full, packets are dropped.
     47 // Packets are output at a maximum bandwidth.
     48 class Buffer : public PacketPipe {
     49  public:
     50   Buffer(size_t buffer_size, double max_megabits_per_second)
     51       : buffer_size_(0),
     52         max_buffer_size_(buffer_size),
     53         max_megabits_per_second_(max_megabits_per_second),
     54         weak_factory_(this) {
     55     CHECK_GT(max_buffer_size_, 0UL);
     56     CHECK_GT(max_megabits_per_second, 0);
     57   }
     58 
     59   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
     60     if (packet->size() + buffer_size_ <= max_buffer_size_) {
     61       buffer_size_ += packet->size();
     62       buffer_.push_back(linked_ptr<Packet>(packet.release()));
     63       if (buffer_.size() == 1) {
     64         Schedule();
     65       }
     66     }
     67   }
     68 
     69  private:
     70   void Schedule() {
     71     last_schedule_ = clock_->NowTicks();
     72     double megabits = buffer_.front()->size() * 8 / 1000000.0;
     73     double seconds = megabits / max_megabits_per_second_;
     74     int64 microseconds = static_cast<int64>(seconds * 1E6);
     75     task_runner_->PostDelayedTask(
     76         FROM_HERE,
     77         base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
     78         base::TimeDelta::FromMicroseconds(microseconds));
     79   }
     80 
     81   void ProcessBuffer() {
     82     int64 bytes_to_send = static_cast<int64>(
     83         (clock_->NowTicks() - last_schedule_).InSecondsF() *
     84         max_megabits_per_second_ * 1E6 / 8);
     85     if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
     86       bytes_to_send = buffer_.front()->size();
     87     }
     88     while (!buffer_.empty() &&
     89            static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
     90       CHECK(!buffer_.empty());
     91       scoped_ptr<Packet> packet(buffer_.front().release());
     92       bytes_to_send -= packet->size();
     93       buffer_size_ -= packet->size();
     94       buffer_.pop_front();
     95       pipe_->Send(packet.Pass());
     96     }
     97     if (!buffer_.empty()) {
     98       Schedule();
     99     }
    100   }
    101 
    102   std::deque<linked_ptr<Packet> > buffer_;
    103   base::TimeTicks last_schedule_;
    104   size_t buffer_size_;
    105   size_t max_buffer_size_;
    106   double max_megabits_per_second_;  // megabits per second
    107   base::WeakPtrFactory<Buffer> weak_factory_;
    108 };
    109 
    110 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
    111   return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
    112 }
    113 
    114 class RandomDrop : public PacketPipe {
    115  public:
    116   RandomDrop(double drop_fraction)
    117       : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
    118 
    119   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    120     if (rand() > drop_fraction_) {
    121       pipe_->Send(packet.Pass());
    122     }
    123   }
    124 
    125  private:
    126   int drop_fraction_;
    127 };
    128 
    129 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
    130   return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
    131 }
    132 
    133 class SimpleDelayBase : public PacketPipe {
    134  public:
    135   SimpleDelayBase() : weak_factory_(this) {}
    136   virtual ~SimpleDelayBase() {}
    137 
    138   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    139     double seconds = GetDelay();
    140     task_runner_->PostDelayedTask(
    141         FROM_HERE,
    142         base::Bind(&SimpleDelayBase::SendInternal,
    143                    weak_factory_.GetWeakPtr(),
    144                    base::Passed(&packet)),
    145         base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
    146   }
    147  protected:
    148   virtual double GetDelay() = 0;
    149 
    150  private:
    151   virtual void SendInternal(scoped_ptr<Packet> packet) {
    152     pipe_->Send(packet.Pass());
    153   }
    154 
    155   base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
    156 };
    157 
    158 class ConstantDelay : public SimpleDelayBase {
    159  public:
    160   ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
    161   virtual double GetDelay() OVERRIDE {
    162     return delay_seconds_;
    163   }
    164 
    165  private:
    166   double delay_seconds_;
    167 };
    168 
    169 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
    170   return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
    171 }
    172 
    173 class RandomUnsortedDelay : public SimpleDelayBase {
    174  public:
    175   RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
    176 
    177   virtual double GetDelay() OVERRIDE {
    178     return random_delay_ * base::RandDouble();
    179   }
    180 
    181  private:
    182   double random_delay_;
    183 };
    184 
    185 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
    186   return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
    187 }
    188 
    189 class DuplicateAndDelay : public RandomUnsortedDelay {
    190  public:
    191   DuplicateAndDelay(double delay_min,
    192                     double random_delay) :
    193       RandomUnsortedDelay(random_delay),
    194       delay_min_(delay_min) {
    195   }
    196   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    197     pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
    198     RandomUnsortedDelay::Send(packet.Pass());
    199   }
    200   virtual double GetDelay() OVERRIDE {
    201     return RandomUnsortedDelay::GetDelay() + delay_min_;
    202   }
    203  private:
    204   double delay_min_;
    205 };
    206 
    207 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
    208                                             double random_delay) {
    209   return scoped_ptr<PacketPipe>(
    210       new DuplicateAndDelay(delay_min, random_delay)).Pass();
    211 }
    212 
    213 class RandomSortedDelay : public PacketPipe {
    214  public:
    215   RandomSortedDelay(double random_delay,
    216                     double extra_delay,
    217                     double seconds_between_extra_delay)
    218       : random_delay_(random_delay),
    219         extra_delay_(extra_delay),
    220         seconds_between_extra_delay_(seconds_between_extra_delay),
    221         weak_factory_(this) {}
    222 
    223   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    224     buffer_.push_back(linked_ptr<Packet>(packet.release()));
    225     if (buffer_.size() == 1) {
    226       next_send_ = std::max(
    227           clock_->NowTicks() +
    228           base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
    229           next_send_);
    230       ProcessBuffer();
    231     }
    232   }
    233   virtual void InitOnIOThread(
    234       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
    235       base::TickClock* clock) OVERRIDE {
    236     PacketPipe::InitOnIOThread(task_runner, clock);
    237     // As we start the stream, assume that we are in a random
    238     // place between two extra delays, thus multiplier = 1.0;
    239     ScheduleExtraDelay(1.0);
    240   }
    241 
    242  private:
    243   void ScheduleExtraDelay(double mult) {
    244     double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
    245     int64 microseconds = static_cast<int64>(seconds * 1E6);
    246     task_runner_->PostDelayedTask(
    247         FROM_HERE,
    248         base::Bind(&RandomSortedDelay::CauseExtraDelay,
    249                    weak_factory_.GetWeakPtr()),
    250         base::TimeDelta::FromMicroseconds(microseconds));
    251   }
    252 
    253   void CauseExtraDelay() {
    254     next_send_ = std::max<base::TimeTicks>(
    255         clock_->NowTicks() +
    256         base::TimeDelta::FromMicroseconds(
    257             static_cast<int64>(extra_delay_ * 1E6)),
    258         next_send_);
    259     // An extra delay just happened, wait up to seconds_between_extra_delay_*2
    260     // before scheduling another one to make the average equal to
    261     // seconds_between_extra_delay_.
    262     ScheduleExtraDelay(2.0);
    263   }
    264 
    265   void ProcessBuffer() {
    266     base::TimeTicks now = clock_->NowTicks();
    267     while (!buffer_.empty() && next_send_ <= now) {
    268       scoped_ptr<Packet> packet(buffer_.front().release());
    269       pipe_->Send(packet.Pass());
    270       buffer_.pop_front();
    271 
    272       next_send_ += base::TimeDelta::FromSecondsD(
    273           base::RandDouble() * random_delay_);
    274     }
    275 
    276     if (!buffer_.empty()) {
    277       task_runner_->PostDelayedTask(
    278           FROM_HERE,
    279           base::Bind(&RandomSortedDelay::ProcessBuffer,
    280                      weak_factory_.GetWeakPtr()),
    281           next_send_ - now);
    282     }
    283   }
    284 
    285   base::TimeTicks block_until_;
    286   std::deque<linked_ptr<Packet> > buffer_;
    287   double random_delay_;
    288   double extra_delay_;
    289   double seconds_between_extra_delay_;
    290   base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
    291   base::TimeTicks next_send_;
    292 };
    293 
    294 scoped_ptr<PacketPipe> NewRandomSortedDelay(
    295     double random_delay,
    296     double extra_delay,
    297     double seconds_between_extra_delay) {
    298   return scoped_ptr<PacketPipe>(
    299              new RandomSortedDelay(
    300                  random_delay, extra_delay, seconds_between_extra_delay))
    301       .Pass();
    302 }
    303 
    304 class NetworkGlitchPipe : public PacketPipe {
    305  public:
    306   NetworkGlitchPipe(double average_work_time, double average_outage_time)
    307       : works_(false),
    308         max_work_time_(average_work_time * 2),
    309         max_outage_time_(average_outage_time * 2),
    310         weak_factory_(this) {}
    311 
    312   virtual void InitOnIOThread(
    313       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
    314       base::TickClock* clock) OVERRIDE {
    315     PacketPipe::InitOnIOThread(task_runner, clock);
    316     Flip();
    317   }
    318 
    319   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    320     if (works_) {
    321       pipe_->Send(packet.Pass());
    322     }
    323   }
    324 
    325  private:
    326   void Flip() {
    327     works_ = !works_;
    328     double seconds = base::RandDouble() *
    329         (works_ ? max_work_time_ : max_outage_time_);
    330     int64 microseconds = static_cast<int64>(seconds * 1E6);
    331     task_runner_->PostDelayedTask(
    332         FROM_HERE,
    333         base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
    334         base::TimeDelta::FromMicroseconds(microseconds));
    335   }
    336 
    337   bool works_;
    338   double max_work_time_;
    339   double max_outage_time_;
    340   base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
    341 };
    342 
    343 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
    344                                             double average_outage_time) {
    345   return scoped_ptr<PacketPipe>(
    346              new NetworkGlitchPipe(average_work_time, average_outage_time))
    347       .Pass();
    348 }
    349 
    350 
    351 // Internal buffer object for a client of the IPP model.
    352 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
    353  public:
    354   InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
    355                  size_t size)
    356       : ipp_(ipp),
    357         stored_size_(0),
    358         stored_limit_(size),
    359         clock_(NULL),
    360         weak_factory_(this) {
    361   }
    362 
    363   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
    364     // Drop if buffer is full.
    365     if (stored_size_ >= stored_limit_)
    366       return;
    367     stored_size_ += packet->size();
    368     buffer_.push_back(linked_ptr<Packet>(packet.release()));
    369     buffer_time_.push_back(clock_->NowTicks());
    370     DCHECK(buffer_.size() == buffer_time_.size());
    371   }
    372 
    373   virtual void InitOnIOThread(
    374       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
    375       base::TickClock* clock) OVERRIDE {
    376     clock_ = clock;
    377     if (ipp_)
    378       ipp_->InitOnIOThread(task_runner, clock);
    379     PacketPipe::InitOnIOThread(task_runner, clock);
    380   }
    381 
    382   void SendOnePacket() {
    383     scoped_ptr<Packet> packet(buffer_.front().release());
    384     stored_size_ -= packet->size();
    385     buffer_.pop_front();
    386     buffer_time_.pop_front();
    387     pipe_->Send(packet.Pass());
    388     DCHECK(buffer_.size() == buffer_time_.size());
    389   }
    390 
    391   bool Empty() const {
    392     return buffer_.empty();
    393   }
    394 
    395   base::TimeTicks FirstPacketTime() const {
    396     DCHECK(!buffer_time_.empty());
    397     return buffer_time_.front();
    398   }
    399 
    400   base::WeakPtr<InternalBuffer> GetWeakPtr() {
    401     return weak_factory_.GetWeakPtr();
    402 
    403   }
    404 
    405  private:
    406   const base::WeakPtr<InterruptedPoissonProcess> ipp_;
    407   size_t stored_size_;
    408   const size_t stored_limit_;
    409   std::deque<linked_ptr<Packet> > buffer_;
    410   std::deque<base::TimeTicks> buffer_time_;
    411   base::TickClock* clock_;
    412   base::WeakPtrFactory<InternalBuffer> weak_factory_;
    413 
    414   DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
    415 };
    416 
    417 InterruptedPoissonProcess::InterruptedPoissonProcess(
    418     const std::vector<double>& average_rates,
    419     double coef_burstiness,
    420     double coef_variance,
    421     uint32 rand_seed)
    422     : clock_(NULL),
    423       average_rates_(average_rates),
    424       coef_burstiness_(coef_burstiness),
    425       coef_variance_(coef_variance),
    426       rate_index_(0),
    427       on_state_(true),
    428       weak_factory_(this) {
    429   mt_rand_.init_genrand(rand_seed);
    430   DCHECK(!average_rates.empty());
    431   ComputeRates();
    432 }
    433 
    434 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
    435 }
    436 
    437 void InterruptedPoissonProcess::InitOnIOThread(
    438     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
    439     base::TickClock* clock) {
    440   // Already initialized and started.
    441   if (task_runner_.get() && clock_)
    442     return;
    443   task_runner_ = task_runner;
    444   clock_ = clock;
    445   UpdateRates();
    446   SwitchOn();
    447   SendPacket();
    448 }
    449 
    450 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
    451   scoped_ptr<InternalBuffer> buffer(
    452       new InternalBuffer(weak_factory_.GetWeakPtr(), size));
    453   send_buffers_.push_back(buffer->GetWeakPtr());
    454   return buffer.PassAs<PacketPipe>();
    455 }
    456 
    457 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
    458   // Rate is per milliseconds.
    459   // The time until next event is exponentially distributed to the
    460   // inverse of |rate|.
    461   return base::TimeDelta::FromMillisecondsD(
    462       fabs(-log(1.0 - RandDouble()) / rate));
    463 }
    464 
    465 double InterruptedPoissonProcess::RandDouble() {
    466   // Generate a 64-bits random number from MT19937 and then convert
    467   // it to double.
    468   uint64 rand = mt_rand_.genrand_int32();
    469   rand <<= 32;
    470   rand |= mt_rand_.genrand_int32();
    471   return base::BitsToOpenEndedUnitInterval(rand);
    472 }
    473 
    474 void InterruptedPoissonProcess::ComputeRates() {
    475   double avg_rate = average_rates_[rate_index_];
    476 
    477   send_rate_ = avg_rate / coef_burstiness_;
    478   switch_off_rate_ =
    479       2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
    480       coef_burstiness_ / (coef_variance_ - 1);
    481   switch_on_rate_ =
    482       2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
    483 }
    484 
    485 void InterruptedPoissonProcess::UpdateRates() {
    486   ComputeRates();
    487 
    488   // Rates are updated once per second.
    489   rate_index_ = (rate_index_ + 1) % average_rates_.size();
    490   task_runner_->PostDelayedTask(
    491       FROM_HERE,
    492       base::Bind(&InterruptedPoissonProcess::UpdateRates,
    493                  weak_factory_.GetWeakPtr()),
    494       base::TimeDelta::FromSeconds(1));
    495 }
    496 
    497 void InterruptedPoissonProcess::SwitchOff() {
    498   on_state_ = false;
    499   task_runner_->PostDelayedTask(
    500       FROM_HERE,
    501       base::Bind(&InterruptedPoissonProcess::SwitchOn,
    502                  weak_factory_.GetWeakPtr()),
    503       NextEvent(switch_on_rate_));
    504 }
    505 
    506 void InterruptedPoissonProcess::SwitchOn() {
    507   on_state_ = true;
    508   task_runner_->PostDelayedTask(
    509       FROM_HERE,
    510       base::Bind(&InterruptedPoissonProcess::SwitchOff,
    511                  weak_factory_.GetWeakPtr()),
    512       NextEvent(switch_off_rate_));
    513 }
    514 
    515 void InterruptedPoissonProcess::SendPacket() {
    516   task_runner_->PostDelayedTask(
    517       FROM_HERE,
    518       base::Bind(&InterruptedPoissonProcess::SendPacket,
    519                  weak_factory_.GetWeakPtr()),
    520       NextEvent(send_rate_));
    521 
    522   // If OFF then don't send.
    523   if (!on_state_)
    524     return;
    525 
    526   // Find the earliest packet to send.
    527   base::TimeTicks earliest_time;
    528   for (size_t i = 0; i < send_buffers_.size(); ++i) {
    529     if (!send_buffers_[i])
    530       continue;
    531     if (send_buffers_[i]->Empty())
    532       continue;
    533     if (earliest_time.is_null() ||
    534         send_buffers_[i]->FirstPacketTime() < earliest_time)
    535       earliest_time = send_buffers_[i]->FirstPacketTime();
    536   }
    537   for (size_t i = 0; i < send_buffers_.size(); ++i) {
    538     if (!send_buffers_[i])
    539       continue;
    540     if (send_buffers_[i]->Empty())
    541       continue;
    542     if (send_buffers_[i]->FirstPacketTime() != earliest_time)
    543       continue;
    544     send_buffers_[i]->SendOnePacket();
    545     break;
    546   }
    547 }
    548 
    549 class UDPProxyImpl;
    550 
    551 class PacketSender : public PacketPipe {
    552  public:
    553   PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
    554       : udp_proxy_(udp_proxy), destination_(destination) {}
    555   virtual void Send(scoped_ptr<Packet> packet) OVERRIDE;
    556   virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
    557     NOTREACHED();
    558   }
    559 
    560  private:
    561   UDPProxyImpl* udp_proxy_;
    562   const net::IPEndPoint* destination_;  // not owned
    563 };
    564 
    565 namespace {
    566 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
    567   if (*pipe) {
    568     (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
    569   } else {
    570     pipe->reset(next);
    571   }
    572 }
    573 }  // namespace
    574 
    575 scoped_ptr<PacketPipe> GoodNetwork() {
    576   // This represents the buffer on the sender.
    577   scoped_ptr<PacketPipe> pipe;
    578   BuildPipe(&pipe, new Buffer(2 << 20, 50));
    579   BuildPipe(&pipe, new ConstantDelay(1E-3));
    580   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
    581   // This represents the buffer on the receiving device.
    582   BuildPipe(&pipe, new Buffer(2 << 20, 50));
    583   return pipe.Pass();
    584 }
    585 
    586 scoped_ptr<PacketPipe> WifiNetwork() {
    587   // This represents the buffer on the sender.
    588   scoped_ptr<PacketPipe> pipe;
    589   BuildPipe(&pipe, new Buffer(256 << 10, 20));
    590   BuildPipe(&pipe, new RandomDrop(0.005));
    591   // This represents the buffer on the router.
    592   BuildPipe(&pipe, new ConstantDelay(1E-3));
    593   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
    594   BuildPipe(&pipe, new Buffer(256 << 10, 20));
    595   BuildPipe(&pipe, new ConstantDelay(1E-3));
    596   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
    597   BuildPipe(&pipe, new RandomDrop(0.005));
    598   // This represents the buffer on the receiving device.
    599   BuildPipe(&pipe, new Buffer(256 << 10, 20));
    600   return pipe.Pass();
    601 }
    602 
    603 scoped_ptr<PacketPipe> BadNetwork() {
    604   scoped_ptr<PacketPipe> pipe;
    605   // This represents the buffer on the sender.
    606   BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
    607   BuildPipe(&pipe, new RandomDrop(0.05));  // 5% packet drop
    608   BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
    609   // This represents the buffer on the router.
    610   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 4mbit/s
    611   BuildPipe(&pipe, new ConstantDelay(1E-3));
    612   // Random 40ms every other second
    613   //  BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
    614   BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
    615   // This represents the buffer on the receiving device.
    616   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 5mbit/s
    617   return pipe.Pass();
    618 }
    619 
    620 
    621 scoped_ptr<PacketPipe> EvilNetwork() {
    622   // This represents the buffer on the sender.
    623   scoped_ptr<PacketPipe> pipe;
    624   BuildPipe(&pipe, new Buffer(4 << 10, 5));  // 4 kb buf, 2mbit/s
    625   // This represents the buffer on the router.
    626   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
    627   BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
    628   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
    629   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
    630   BuildPipe(&pipe, new ConstantDelay(1E-3));
    631   BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
    632   BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
    633   // This represents the buffer on the receiving device.
    634   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
    635   return pipe.Pass();
    636 }
    637 
    638 class UDPProxyImpl : public UDPProxy {
    639  public:
    640   UDPProxyImpl(const net::IPEndPoint& local_port,
    641                const net::IPEndPoint& destination,
    642                scoped_ptr<PacketPipe> to_dest_pipe,
    643                scoped_ptr<PacketPipe> from_dest_pipe,
    644                net::NetLog* net_log)
    645       : local_port_(local_port),
    646         destination_(destination),
    647         destination_is_mutable_(destination.address().empty()),
    648         proxy_thread_("media::cast::test::UdpProxy Thread"),
    649         to_dest_pipe_(to_dest_pipe.Pass()),
    650         from_dest_pipe_(from_dest_pipe.Pass()),
    651         blocked_(false),
    652         weak_factory_(this) {
    653     proxy_thread_.StartWithOptions(
    654         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
    655     base::WaitableEvent start_event(false, false);
    656     proxy_thread_.message_loop_proxy()->PostTask(
    657         FROM_HERE,
    658         base::Bind(&UDPProxyImpl::Start,
    659                    base::Unretained(this),
    660                    base::Unretained(&start_event),
    661                    net_log));
    662     start_event.Wait();
    663   }
    664 
    665   virtual ~UDPProxyImpl() {
    666     base::WaitableEvent stop_event(false, false);
    667     proxy_thread_.message_loop_proxy()->PostTask(
    668         FROM_HERE,
    669         base::Bind(&UDPProxyImpl::Stop,
    670                    base::Unretained(this),
    671                    base::Unretained(&stop_event)));
    672     stop_event.Wait();
    673     proxy_thread_.Stop();
    674   }
    675 
    676   void Send(scoped_ptr<Packet> packet,
    677             const net::IPEndPoint& destination) {
    678     if (blocked_) {
    679       LOG(ERROR) << "Cannot write packet right now: blocked";
    680       return;
    681     }
    682 
    683     VLOG(1) << "Sending packet, len = " << packet->size();
    684     // We ignore all problems, callbacks and errors.
    685     // If it didn't work we just drop the packet at and call it a day.
    686     scoped_refptr<net::IOBuffer> buf =
    687         new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
    688     size_t buf_size = packet->size();
    689     int result;
    690     if (destination.address().empty()) {
    691       VLOG(1) << "Destination has not been set yet.";
    692       result = net::ERR_INVALID_ARGUMENT;
    693     } else {
    694       VLOG(1) << "Destination:" << destination.ToString();
    695       result = socket_->SendTo(buf.get(),
    696                                static_cast<int>(buf_size),
    697                                destination,
    698                                base::Bind(&UDPProxyImpl::AllowWrite,
    699                                           weak_factory_.GetWeakPtr(),
    700                                           buf,
    701                                           base::Passed(&packet)));
    702     }
    703     if (result == net::ERR_IO_PENDING) {
    704       blocked_ = true;
    705     } else if (result < 0) {
    706       LOG(ERROR) << "Failed to write packet.";
    707     }
    708   }
    709 
    710  private:
    711   void Start(base::WaitableEvent* start_event,
    712              net::NetLog* net_log) {
    713     socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
    714                                      net::RandIntCallback(),
    715                                      net_log,
    716                                      net::NetLog::Source()));
    717     BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
    718     BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
    719     to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
    720                                   &tick_clock_);
    721     from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
    722                                     &tick_clock_);
    723 
    724     VLOG(0) << "From:" << local_port_.ToString();
    725     if (!destination_is_mutable_)
    726       VLOG(0) << "To:" << destination_.ToString();
    727 
    728     CHECK_GE(socket_->Bind(local_port_), 0);
    729 
    730     start_event->Signal();
    731     PollRead();
    732   }
    733 
    734   void Stop(base::WaitableEvent* stop_event) {
    735     to_dest_pipe_.reset(NULL);
    736     from_dest_pipe_.reset(NULL);
    737     socket_.reset(NULL);
    738     stop_event->Signal();
    739   }
    740 
    741   void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
    742     DCHECK_NE(len, net::ERR_IO_PENDING);
    743     VLOG(1) << "Got packet, len = " << len;
    744     if (len < 0) {
    745       LOG(WARNING) << "Socket read error: " << len;
    746       return;
    747     }
    748     packet_->resize(len);
    749     if (destination_is_mutable_ && set_destination_next_ &&
    750         !(recv_address_ == return_address_) &&
    751         !(recv_address_ == destination_)) {
    752       destination_ = recv_address_;
    753     }
    754     if (recv_address_ == destination_) {
    755       set_destination_next_ = false;
    756       from_dest_pipe_->Send(packet_.Pass());
    757     } else {
    758       set_destination_next_ = true;
    759       VLOG(1) << "Return address = " << recv_address_.ToString();
    760       return_address_ = recv_address_;
    761       to_dest_pipe_->Send(packet_.Pass());
    762     }
    763   }
    764 
    765   void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
    766     ProcessPacket(recv_buf, len);
    767     PollRead();
    768   }
    769 
    770   void PollRead() {
    771     while (true) {
    772       packet_.reset(new Packet(kMaxPacketSize));
    773       scoped_refptr<net::IOBuffer> recv_buf =
    774           new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
    775       int len = socket_->RecvFrom(
    776           recv_buf.get(),
    777           kMaxPacketSize,
    778           &recv_address_,
    779           base::Bind(
    780               &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
    781       if (len == net::ERR_IO_PENDING)
    782         break;
    783       ProcessPacket(recv_buf, len);
    784     }
    785   }
    786 
    787   void AllowWrite(scoped_refptr<net::IOBuffer> buf,
    788                   scoped_ptr<Packet> packet,
    789                   int unused_len) {
    790     DCHECK(blocked_);
    791     blocked_ = false;
    792   }
    793 
    794   // Input
    795   net::IPEndPoint local_port_;
    796 
    797   net::IPEndPoint destination_;
    798   bool destination_is_mutable_;
    799 
    800   net::IPEndPoint return_address_;
    801   bool set_destination_next_;
    802 
    803   base::DefaultTickClock tick_clock_;
    804   base::Thread proxy_thread_;
    805   scoped_ptr<net::UDPSocket> socket_;
    806   scoped_ptr<PacketPipe> to_dest_pipe_;
    807   scoped_ptr<PacketPipe> from_dest_pipe_;
    808 
    809   // For receiving.
    810   net::IPEndPoint recv_address_;
    811   scoped_ptr<Packet> packet_;
    812 
    813   // For sending.
    814   bool blocked_;
    815 
    816   base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
    817 };
    818 
    819 void PacketSender::Send(scoped_ptr<Packet> packet) {
    820   udp_proxy_->Send(packet.Pass(), *destination_);
    821 }
    822 
    823 scoped_ptr<UDPProxy> UDPProxy::Create(
    824     const net::IPEndPoint& local_port,
    825     const net::IPEndPoint& destination,
    826     scoped_ptr<PacketPipe> to_dest_pipe,
    827     scoped_ptr<PacketPipe> from_dest_pipe,
    828     net::NetLog* net_log) {
    829   scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
    830                                             destination,
    831                                             to_dest_pipe.Pass(),
    832                                             from_dest_pipe.Pass(),
    833                                             net_log));
    834   return ret.Pass();
    835 }
    836 
    837 }  // namespace test
    838 }  // namespace cast
    839 }  // namespace media
    840