1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/cast/test/transport/transport.h" 6 7 #include <string> 8 9 #include "base/bind.h" 10 #include "base/logging.h" 11 #include "base/memory/ref_counted.h" 12 #include "base/memory/scoped_ptr.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/rand_util.h" 15 #include "net/base/io_buffer.h" 16 #include "net/base/rand_callback.h" 17 #include "net/base/test_completion_callback.h" 18 19 namespace media { 20 namespace cast { 21 namespace test { 22 23 const int kMaxPacketSize = 1500; 24 25 class LocalUdpTransportData; 26 27 void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) { 28 net::IPAddressNumber ip_number; 29 bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number); 30 if (!rv) 31 return; 32 *address = net::IPEndPoint(ip_number, port); 33 } 34 35 class LocalUdpTransportData 36 : public base::RefCountedThreadSafe<LocalUdpTransportData> { 37 public: 38 LocalUdpTransportData(net::UDPServerSocket* udp_socket, 39 scoped_refptr<base::TaskRunner> io_thread_proxy) 40 : udp_socket_(udp_socket), 41 buffer_(new net::IOBufferWithSize(kMaxPacketSize)), 42 io_thread_proxy_(io_thread_proxy) { 43 } 44 45 void ListenTo(net::IPEndPoint bind_address) { 46 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 47 48 bind_address_ = bind_address; 49 io_thread_proxy_->PostTask(FROM_HERE, 50 base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this)); 51 } 52 53 void DeletePacket(uint8* data) { 54 // Should be called from the receiver (not on the transport thread). 55 DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread())); 56 delete [] data; 57 } 58 59 void PacketReceived(int size) { 60 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 61 // Got a packet with length result. 62 uint8* data = new uint8[size]; 63 memcpy(data, buffer_->data(), size); 64 packet_receiver_->ReceivedPacket(data, size, 65 base::Bind(&LocalUdpTransportData::DeletePacket, this, data)); 66 RecvFromSocketLoop(); 67 68 } 69 70 void RecvFromSocketLoop() { 71 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 72 // Callback should always trigger with a packet. 73 int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize, 74 &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived, 75 this)); 76 DCHECK(res >= net::ERR_IO_PENDING); 77 if (res > 0) { 78 PacketReceived(res); 79 } 80 } 81 82 void set_packet_receiver(PacketReceiver* packet_receiver) { 83 packet_receiver_ = packet_receiver; 84 } 85 86 void Close() { 87 udp_socket_->Close(); 88 } 89 90 protected: 91 virtual ~LocalUdpTransportData() {} 92 93 private: 94 friend class base::RefCountedThreadSafe<LocalUdpTransportData>; 95 96 net::UDPServerSocket* udp_socket_; 97 net::IPEndPoint bind_address_; 98 PacketReceiver* packet_receiver_; 99 scoped_refptr<net::IOBufferWithSize> buffer_; 100 scoped_refptr<base::TaskRunner> io_thread_proxy_; 101 102 DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData); 103 }; 104 105 class LocalPacketSender : public PacketSender, 106 public base::RefCountedThreadSafe<LocalPacketSender> { 107 public: 108 LocalPacketSender(net::UDPServerSocket* udp_socket, 109 scoped_refptr<base::TaskRunner> io_thread_proxy) 110 : udp_socket_(udp_socket), 111 send_address_(), 112 loss_limit_(0), 113 io_thread_proxy_(io_thread_proxy) {} 114 115 virtual bool SendPacket(const Packet& packet) OVERRIDE { 116 io_thread_proxy_->PostTask(FROM_HERE, 117 base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet)); 118 return true; 119 } 120 121 virtual void SendPacketToNetwork(const Packet& packet) { 122 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 123 const uint8* data = packet.data(); 124 if (loss_limit_ > 0) { 125 int r = base::RandInt(0, 100); 126 if (r < loss_limit_) { 127 VLOG(1) << "Drop packet f:" << static_cast<int>(data[12 + 1]) 128 << " p:" << static_cast<int>(data[12 + 3]) 129 << " m:" << static_cast<int>(data[12 + 5]); 130 return; 131 } 132 } 133 net::TestCompletionCallback callback; 134 scoped_refptr<net::WrappedIOBuffer> buffer( 135 new net::WrappedIOBuffer(reinterpret_cast<const char*>(data))); 136 udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()), 137 send_address_, callback.callback()); 138 } 139 140 virtual bool SendPackets(const PacketList& packets) OVERRIDE { 141 bool out_val = true; 142 for (size_t i = 0; i < packets.size(); ++i) { 143 const Packet& packet = packets[i]; 144 out_val |= SendPacket(packet); 145 } 146 return out_val; 147 } 148 149 void SetPacketLoss(int percentage) { 150 DCHECK_GE(percentage, 0); 151 DCHECK_LT(percentage, 100); 152 loss_limit_ = percentage; 153 } 154 155 void SetSendAddress(const net::IPEndPoint& send_address) { 156 send_address_ = send_address; 157 } 158 159 protected: 160 virtual ~LocalPacketSender() {} 161 162 private: 163 friend class base::RefCountedThreadSafe<LocalPacketSender>; 164 165 net::UDPServerSocket* udp_socket_; // Not owned by this class. 166 net::IPEndPoint send_address_; 167 int loss_limit_; 168 scoped_refptr<base::TaskRunner> io_thread_proxy_; 169 }; 170 171 Transport::Transport( 172 scoped_refptr<base::TaskRunner> io_thread_proxy) 173 : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), 174 local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(), 175 io_thread_proxy)), 176 packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)), 177 io_thread_proxy_(io_thread_proxy) {} 178 179 Transport::~Transport() {} 180 181 PacketSender* Transport::packet_sender() { 182 return static_cast<PacketSender*>(packet_sender_.get()); 183 } 184 185 void Transport::SetSendSidePacketLoss(int percentage) { 186 packet_sender_->SetPacketLoss(percentage); 187 } 188 189 void Transport::StopReceiving() { 190 local_udp_transport_data_->Close(); 191 } 192 193 void Transport::SetLocalReceiver(PacketReceiver* packet_receiver, 194 std::string ip_address, 195 std::string local_ip_address, 196 int port) { 197 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 198 net::IPEndPoint bind_address, local_bind_address; 199 CreateUDPAddress(ip_address, port, &bind_address); 200 CreateUDPAddress(local_ip_address, port, &local_bind_address); 201 local_udp_transport_data_->set_packet_receiver(packet_receiver); 202 udp_socket_->AllowAddressReuse(); 203 udp_socket_->SetMulticastLoopbackMode(true); 204 udp_socket_->Listen(local_bind_address); 205 206 // Start listening once receiver has been set. 207 local_udp_transport_data_->ListenTo(bind_address); 208 } 209 210 void Transport::SetSendDestination(std::string ip_address, int port) { 211 net::IPEndPoint send_address; 212 CreateUDPAddress(ip_address, port, &send_address); 213 packet_sender_->SetSendAddress(send_address); 214 } 215 216 } // namespace test 217 } // namespace cast 218 } // namespace media 219