1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // MSVC++ requires this to be set before any other includes to get M_PI. 6 #define _USE_MATH_DEFINES 7 8 #include "remoting/test/fake_socket_factory.h" 9 10 #include <math.h> 11 12 #include "base/bind.h" 13 #include "base/callback.h" 14 #include "base/location.h" 15 #include "base/rand_util.h" 16 #include "base/single_thread_task_runner.h" 17 #include "base/thread_task_runner_handle.h" 18 #include "net/base/io_buffer.h" 19 #include "remoting/test/leaky_bucket.h" 20 #include "third_party/webrtc/base/asyncpacketsocket.h" 21 22 namespace remoting { 23 24 namespace { 25 26 const int kPortRangeStart = 1024; 27 const int kPortRangeEnd = 65535; 28 29 double GetNormalRandom(double average, double stddev) { 30 // Based on Box-Muller transform, see 31 // http://en.wikipedia.org/wiki/Box_Muller_transform . 32 return average + 33 stddev * sqrt(-2.0 * log(1.0 - base::RandDouble())) * 34 cos(base::RandDouble() * 2.0 * M_PI); 35 } 36 37 class FakeUdpSocket : public rtc::AsyncPacketSocket { 38 public: 39 FakeUdpSocket(FakePacketSocketFactory* factory, 40 scoped_refptr<FakeNetworkDispatcher> dispatcher, 41 const rtc::SocketAddress& local_address); 42 virtual ~FakeUdpSocket(); 43 44 void ReceivePacket(const rtc::SocketAddress& from, 45 const rtc::SocketAddress& to, 46 const scoped_refptr<net::IOBuffer>& data, 47 int data_size); 48 49 // rtc::AsyncPacketSocket interface. 50 virtual rtc::SocketAddress GetLocalAddress() const OVERRIDE; 51 virtual rtc::SocketAddress GetRemoteAddress() const OVERRIDE; 52 virtual int Send(const void* data, size_t data_size, 53 const rtc::PacketOptions& options) OVERRIDE; 54 virtual int SendTo(const void* data, size_t data_size, 55 const rtc::SocketAddress& address, 56 const rtc::PacketOptions& options) OVERRIDE; 57 virtual int Close() OVERRIDE; 58 virtual State GetState() const OVERRIDE; 59 virtual int GetOption(rtc::Socket::Option option, int* value) OVERRIDE; 60 virtual int SetOption(rtc::Socket::Option option, int value) OVERRIDE; 61 virtual int GetError() const OVERRIDE; 62 virtual void SetError(int error) OVERRIDE; 63 64 private: 65 FakePacketSocketFactory* factory_; 66 scoped_refptr<FakeNetworkDispatcher> dispatcher_; 67 rtc::SocketAddress local_address_; 68 State state_; 69 70 DISALLOW_COPY_AND_ASSIGN(FakeUdpSocket); 71 }; 72 73 FakeUdpSocket::FakeUdpSocket(FakePacketSocketFactory* factory, 74 scoped_refptr<FakeNetworkDispatcher> dispatcher, 75 const rtc::SocketAddress& local_address) 76 : factory_(factory), 77 dispatcher_(dispatcher), 78 local_address_(local_address), 79 state_(STATE_BOUND) { 80 } 81 82 FakeUdpSocket::~FakeUdpSocket() { 83 factory_->OnSocketDestroyed(local_address_.port()); 84 } 85 86 void FakeUdpSocket::ReceivePacket(const rtc::SocketAddress& from, 87 const rtc::SocketAddress& to, 88 const scoped_refptr<net::IOBuffer>& data, 89 int data_size) { 90 SignalReadPacket( 91 this, data->data(), data_size, from, rtc::CreatePacketTime(0)); 92 } 93 94 rtc::SocketAddress FakeUdpSocket::GetLocalAddress() const { 95 return local_address_; 96 } 97 98 rtc::SocketAddress FakeUdpSocket::GetRemoteAddress() const { 99 NOTREACHED(); 100 return rtc::SocketAddress(); 101 } 102 103 int FakeUdpSocket::Send(const void* data, size_t data_size, 104 const rtc::PacketOptions& options) { 105 NOTREACHED(); 106 return EINVAL; 107 } 108 109 int FakeUdpSocket::SendTo(const void* data, size_t data_size, 110 const rtc::SocketAddress& address, 111 const rtc::PacketOptions& options) { 112 scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(data_size); 113 memcpy(buffer->data(), data, data_size); 114 dispatcher_->DeliverPacket(local_address_, address, buffer, data_size); 115 return data_size; 116 } 117 118 int FakeUdpSocket::Close() { 119 state_ = STATE_CLOSED; 120 return 0; 121 } 122 123 rtc::AsyncPacketSocket::State FakeUdpSocket::GetState() const { 124 return state_; 125 } 126 127 int FakeUdpSocket::GetOption(rtc::Socket::Option option, int* value) { 128 NOTIMPLEMENTED(); 129 return -1; 130 } 131 132 int FakeUdpSocket::SetOption(rtc::Socket::Option option, int value) { 133 NOTIMPLEMENTED(); 134 return -1; 135 } 136 137 int FakeUdpSocket::GetError() const { 138 return 0; 139 } 140 141 void FakeUdpSocket::SetError(int error) { 142 NOTREACHED(); 143 } 144 145 } // namespace 146 147 FakePacketSocketFactory::PendingPacket::PendingPacket() 148 : data_size(0) { 149 } 150 151 FakePacketSocketFactory::PendingPacket::PendingPacket( 152 const rtc::SocketAddress& from, 153 const rtc::SocketAddress& to, 154 const scoped_refptr<net::IOBuffer>& data, 155 int data_size) 156 : from(from), to(to), data(data), data_size(data_size) { 157 } 158 159 FakePacketSocketFactory::PendingPacket::~PendingPacket() { 160 } 161 162 FakePacketSocketFactory::FakePacketSocketFactory( 163 FakeNetworkDispatcher* dispatcher) 164 : task_runner_(base::ThreadTaskRunnerHandle::Get()), 165 dispatcher_(dispatcher), 166 address_(dispatcher_->AllocateAddress()), 167 out_of_order_rate_(0.0), 168 next_port_(kPortRangeStart), 169 weak_factory_(this) { 170 dispatcher_->AddNode(this); 171 } 172 173 FakePacketSocketFactory::~FakePacketSocketFactory() { 174 CHECK(udp_sockets_.empty()); 175 dispatcher_->RemoveNode(this); 176 } 177 178 void FakePacketSocketFactory::OnSocketDestroyed(int port) { 179 DCHECK(task_runner_->BelongsToCurrentThread()); 180 udp_sockets_.erase(port); 181 } 182 183 void FakePacketSocketFactory::SetBandwidth(int bandwidth, int max_buffer) { 184 DCHECK(task_runner_->BelongsToCurrentThread()); 185 if (bandwidth <= 0) { 186 leaky_bucket_.reset(); 187 } else { 188 leaky_bucket_.reset(new LeakyBucket(max_buffer, bandwidth)); 189 } 190 } 191 192 void FakePacketSocketFactory::SetLatency(base::TimeDelta average, 193 base::TimeDelta stddev) { 194 DCHECK(task_runner_->BelongsToCurrentThread()); 195 latency_average_ = average; 196 latency_stddev_ = stddev; 197 } 198 199 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateUdpSocket( 200 const rtc::SocketAddress& local_address, 201 int min_port, int max_port) { 202 DCHECK(task_runner_->BelongsToCurrentThread()); 203 204 int port = -1; 205 if (min_port > 0 && max_port > 0) { 206 for (int i = min_port; i <= max_port; ++i) { 207 if (udp_sockets_.find(i) == udp_sockets_.end()) { 208 port = i; 209 break; 210 } 211 } 212 if (port < 0) 213 return NULL; 214 } else { 215 do { 216 port = next_port_; 217 next_port_ = 218 (next_port_ >= kPortRangeEnd) ? kPortRangeStart : (next_port_ + 1); 219 } while (udp_sockets_.find(port) != udp_sockets_.end()); 220 } 221 222 CHECK(local_address.ipaddr() == address_); 223 224 FakeUdpSocket* result = 225 new FakeUdpSocket(this, dispatcher_, 226 rtc::SocketAddress(local_address.ipaddr(), port)); 227 228 udp_sockets_[port] = 229 base::Bind(&FakeUdpSocket::ReceivePacket, base::Unretained(result)); 230 231 return result; 232 } 233 234 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateServerTcpSocket( 235 const rtc::SocketAddress& local_address, 236 int min_port, int max_port, 237 int opts) { 238 return NULL; 239 } 240 241 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateClientTcpSocket( 242 const rtc::SocketAddress& local_address, 243 const rtc::SocketAddress& remote_address, 244 const rtc::ProxyInfo& proxy_info, 245 const std::string& user_agent, 246 int opts) { 247 return NULL; 248 } 249 250 rtc::AsyncResolverInterface* 251 FakePacketSocketFactory::CreateAsyncResolver() { 252 return NULL; 253 } 254 255 const scoped_refptr<base::SingleThreadTaskRunner>& 256 FakePacketSocketFactory::GetThread() const { 257 return task_runner_; 258 } 259 260 const rtc::IPAddress& FakePacketSocketFactory::GetAddress() const { 261 return address_; 262 } 263 264 void FakePacketSocketFactory::ReceivePacket( 265 const rtc::SocketAddress& from, 266 const rtc::SocketAddress& to, 267 const scoped_refptr<net::IOBuffer>& data, 268 int data_size) { 269 DCHECK(task_runner_->BelongsToCurrentThread()); 270 DCHECK(to.ipaddr() == address_); 271 272 base::TimeDelta delay; 273 274 if (leaky_bucket_) { 275 delay = leaky_bucket_->AddPacket(data_size); 276 if (delay.is_max()) { 277 // Drop the packet. 278 return; 279 } 280 } 281 282 if (latency_average_ > base::TimeDelta()) { 283 delay += base::TimeDelta::FromMillisecondsD( 284 GetNormalRandom(latency_average_.InMillisecondsF(), 285 latency_stddev_.InMillisecondsF())); 286 } 287 if (delay < base::TimeDelta()) 288 delay = base::TimeDelta(); 289 290 // Put the packet to the |pending_packets_| and post a task for 291 // DoReceivePackets(). Note that the DoReceivePackets() task posted here may 292 // deliver a different packet, not the one added to the queue here. This 293 // would happen if another task gets posted with a shorted delay or when 294 // |out_of_order_rate_| is greater than 0. It's implemented this way to 295 // decouple latency variability from out-of-order delivery. 296 PendingPacket packet(from, to, data, data_size); 297 pending_packets_.push_back(packet); 298 task_runner_->PostDelayedTask( 299 FROM_HERE, 300 base::Bind(&FakePacketSocketFactory::DoReceivePacket, 301 weak_factory_.GetWeakPtr()), 302 delay); 303 } 304 305 void FakePacketSocketFactory::DoReceivePacket() { 306 DCHECK(task_runner_->BelongsToCurrentThread()); 307 308 PendingPacket packet; 309 if (pending_packets_.size() > 1 && base::RandDouble() < out_of_order_rate_) { 310 std::list<PendingPacket>::iterator it = pending_packets_.begin(); 311 ++it; 312 packet = *it; 313 pending_packets_.erase(it); 314 } else { 315 packet = pending_packets_.front(); 316 pending_packets_.pop_front(); 317 } 318 319 UdpSocketsMap::iterator iter = udp_sockets_.find(packet.to.port()); 320 if (iter == udp_sockets_.end()) { 321 // Invalid port number. 322 return; 323 } 324 325 iter->second.Run(packet.from, packet.to, packet.data, packet.data_size); 326 } 327 328 } // namespace remoting 329