Home | History | Annotate | Download | only in transport
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/test/transport/transport.h"
      6 
      7 #include <string>
      8 
      9 #include "base/bind.h"
     10 #include "base/logging.h"
     11 #include "base/memory/ref_counted.h"
     12 #include "base/memory/scoped_ptr.h"
     13 #include "base/message_loop/message_loop.h"
     14 #include "base/rand_util.h"
     15 #include "net/base/io_buffer.h"
     16 #include "net/base/rand_callback.h"
     17 #include "net/base/test_completion_callback.h"
     18 
     19 namespace media {
     20 namespace cast {
     21 namespace test {
     22 
     23 const int kMaxPacketSize = 1500;
     24 
     25 class LocalUdpTransportData;
     26 
     27 void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) {
     28   net::IPAddressNumber ip_number;
     29   bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number);
     30   if (!rv)
     31     return;
     32   *address = net::IPEndPoint(ip_number, port);
     33 }
     34 
     35 class LocalUdpTransportData
     36     : public base::RefCountedThreadSafe<LocalUdpTransportData> {
     37  public:
     38   LocalUdpTransportData(net::UDPServerSocket* udp_socket,
     39                         scoped_refptr<base::TaskRunner> io_thread_proxy)
     40     : udp_socket_(udp_socket),
     41       buffer_(new net::IOBufferWithSize(kMaxPacketSize)),
     42       io_thread_proxy_(io_thread_proxy) {
     43   }
     44 
     45   void ListenTo(net::IPEndPoint bind_address) {
     46     DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     47 
     48     bind_address_ = bind_address;
     49     io_thread_proxy_->PostTask(FROM_HERE,
     50         base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this));
     51   }
     52 
     53   void DeletePacket(uint8* data) {
     54     // Should be called from the receiver (not on the transport thread).
     55     DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread()));
     56     delete [] data;
     57   }
     58 
     59   void PacketReceived(int size) {
     60     DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     61     // Got a packet with length result.
     62     uint8* data = new uint8[size];
     63     memcpy(data, buffer_->data(), size);
     64     packet_receiver_->ReceivedPacket(data, size,
     65         base::Bind(&LocalUdpTransportData::DeletePacket, this, data));
     66     RecvFromSocketLoop();
     67 
     68   }
     69 
     70   void RecvFromSocketLoop() {
     71     DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     72     // Callback should always trigger with a packet.
     73     int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize,
     74         &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived,
     75         this));
     76     DCHECK(res >= net::ERR_IO_PENDING);
     77     if (res > 0) {
     78       PacketReceived(res);
     79     }
     80   }
     81 
     82   void set_packet_receiver(PacketReceiver* packet_receiver) {
     83     packet_receiver_ = packet_receiver;
     84   }
     85 
     86   void Close() {
     87     udp_socket_->Close();
     88   }
     89 
     90  protected:
     91   virtual ~LocalUdpTransportData() {}
     92 
     93  private:
     94   friend class base::RefCountedThreadSafe<LocalUdpTransportData>;
     95 
     96   net::UDPServerSocket* udp_socket_;
     97   net::IPEndPoint bind_address_;
     98   PacketReceiver* packet_receiver_;
     99   scoped_refptr<net::IOBufferWithSize> buffer_;
    100   scoped_refptr<base::TaskRunner> io_thread_proxy_;
    101 
    102   DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData);
    103 };
    104 
    105 class LocalPacketSender : public PacketSender,
    106                           public base::RefCountedThreadSafe<LocalPacketSender> {
    107  public:
    108   LocalPacketSender(net::UDPServerSocket* udp_socket,
    109                     scoped_refptr<base::TaskRunner> io_thread_proxy)
    110       : udp_socket_(udp_socket),
    111         send_address_(),
    112         loss_limit_(0),
    113         io_thread_proxy_(io_thread_proxy) {}
    114 
    115   virtual bool SendPacket(const Packet& packet) OVERRIDE {
    116     io_thread_proxy_->PostTask(FROM_HERE,
    117     base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet));
    118     return true;
    119   }
    120 
    121   virtual void SendPacketToNetwork(const Packet& packet) {
    122     DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
    123     const uint8* data = packet.data();
    124     if (loss_limit_ > 0) {
    125       int r = base::RandInt(0, 100);
    126       if (r < loss_limit_) {
    127         VLOG(1) << "Drop packet f:" << static_cast<int>(data[12 + 1])
    128                 << " p:" << static_cast<int>(data[12 + 3])
    129                 << " m:" << static_cast<int>(data[12 + 5]);
    130         return;
    131       }
    132     }
    133     net::TestCompletionCallback callback;
    134     scoped_refptr<net::WrappedIOBuffer> buffer(
    135         new net::WrappedIOBuffer(reinterpret_cast<const char*>(data)));
    136     udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()),
    137                         send_address_, callback.callback());
    138   }
    139 
    140   virtual bool SendPackets(const PacketList& packets) OVERRIDE {
    141     bool out_val = true;
    142     for (size_t i = 0; i < packets.size(); ++i) {
    143       const Packet& packet = packets[i];
    144       out_val |= SendPacket(packet);
    145     }
    146     return out_val;
    147   }
    148 
    149   void SetPacketLoss(int percentage) {
    150     DCHECK_GE(percentage, 0);
    151     DCHECK_LT(percentage, 100);
    152     loss_limit_ = percentage;
    153   }
    154 
    155   void SetSendAddress(const net::IPEndPoint& send_address) {
    156     send_address_ = send_address;
    157   }
    158 
    159  protected:
    160   virtual ~LocalPacketSender() {}
    161 
    162  private:
    163   friend class base::RefCountedThreadSafe<LocalPacketSender>;
    164 
    165   net::UDPServerSocket* udp_socket_;  // Not owned by this class.
    166   net::IPEndPoint send_address_;
    167   int loss_limit_;
    168   scoped_refptr<base::TaskRunner> io_thread_proxy_;
    169 };
    170 
    171 Transport::Transport(
    172     scoped_refptr<base::TaskRunner> io_thread_proxy)
    173     : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())),
    174       local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(),
    175                                                           io_thread_proxy)),
    176       packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)),
    177       io_thread_proxy_(io_thread_proxy) {}
    178 
    179 Transport::~Transport() {}
    180 
    181 PacketSender* Transport::packet_sender() {
    182   return static_cast<PacketSender*>(packet_sender_.get());
    183 }
    184 
    185 void Transport::SetSendSidePacketLoss(int percentage) {
    186   packet_sender_->SetPacketLoss(percentage);
    187 }
    188 
    189 void Transport::StopReceiving() {
    190   local_udp_transport_data_->Close();
    191 }
    192 
    193 void Transport::SetLocalReceiver(PacketReceiver* packet_receiver,
    194                                  std::string ip_address,
    195                                  std::string local_ip_address,
    196                                  int port) {
    197   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
    198   net::IPEndPoint bind_address, local_bind_address;
    199   CreateUDPAddress(ip_address, port, &bind_address);
    200   CreateUDPAddress(local_ip_address, port, &local_bind_address);
    201   local_udp_transport_data_->set_packet_receiver(packet_receiver);
    202   udp_socket_->AllowAddressReuse();
    203   udp_socket_->SetMulticastLoopbackMode(true);
    204   udp_socket_->Listen(local_bind_address);
    205 
    206   // Start listening once receiver has been set.
    207   local_udp_transport_data_->ListenTo(bind_address);
    208 }
    209 
    210 void Transport::SetSendDestination(std::string ip_address, int port) {
    211   net::IPEndPoint send_address;
    212   CreateUDPAddress(ip_address, port, &send_address);
    213   packet_sender_->SetSendAddress(send_address);
    214 }
    215 
    216 }  // namespace test
    217 }  // namespace cast
    218 }  // namespace media
    219