1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stdlib.h> 6 7 #include "media/cast/test/utility/udp_proxy.h" 8 9 #include "base/logging.h" 10 #include "base/memory/linked_ptr.h" 11 #include "base/rand_util.h" 12 #include "base/synchronization/waitable_event.h" 13 #include "base/threading/thread.h" 14 #include "base/time/default_tick_clock.h" 15 #include "net/base/io_buffer.h" 16 #include "net/base/net_errors.h" 17 #include "net/udp/udp_socket.h" 18 19 namespace media { 20 namespace cast { 21 namespace test { 22 23 const size_t kMaxPacketSize = 65536; 24 25 PacketPipe::PacketPipe() {} 26 PacketPipe::~PacketPipe() {} 27 void PacketPipe::InitOnIOThread( 28 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 29 base::TickClock* clock) { 30 task_runner_ = task_runner; 31 clock_ = clock; 32 if (pipe_) { 33 pipe_->InitOnIOThread(task_runner, clock); 34 } 35 } 36 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) { 37 if (pipe_) { 38 pipe_->AppendToPipe(pipe.Pass()); 39 } else { 40 pipe_ = pipe.Pass(); 41 } 42 } 43 44 // Roughly emulates a buffer inside a device. 45 // If the buffer is full, packets are dropped. 46 // Packets are output at a maximum bandwidth. 47 class Buffer : public PacketPipe { 48 public: 49 Buffer(size_t buffer_size, double max_megabits_per_second) 50 : buffer_size_(0), 51 max_buffer_size_(buffer_size), 52 max_megabits_per_second_(max_megabits_per_second), 53 weak_factory_(this) { 54 CHECK_GT(max_buffer_size_, 0UL); 55 CHECK_GT(max_megabits_per_second, 0); 56 } 57 58 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { 59 if (packet->size() + buffer_size_ <= max_buffer_size_) { 60 buffer_size_ += packet->size(); 61 buffer_.push_back(linked_ptr<transport::Packet>(packet.release())); 62 if (buffer_.size() == 1) { 63 Schedule(); 64 } 65 } 66 } 67 68 private: 69 void Schedule() { 70 double megabits = buffer_.front()->size() * 8 / 1000000.0; 71 double seconds = megabits / max_megabits_per_second_; 72 int64 microseconds = static_cast<int64>(seconds * 1E6); 73 task_runner_->PostDelayedTask( 74 FROM_HERE, 75 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()), 76 base::TimeDelta::FromMicroseconds(microseconds)); 77 } 78 79 void ProcessBuffer() { 80 CHECK(!buffer_.empty()); 81 scoped_ptr<transport::Packet> packet(buffer_.front().release()); 82 buffer_size_ -= packet->size(); 83 buffer_.pop_front(); 84 pipe_->Send(packet.Pass()); 85 if (!buffer_.empty()) { 86 Schedule(); 87 } 88 } 89 90 std::deque<linked_ptr<transport::Packet> > buffer_; 91 size_t buffer_size_; 92 size_t max_buffer_size_; 93 double max_megabits_per_second_; // megabits per second 94 base::WeakPtrFactory<Buffer> weak_factory_; 95 }; 96 97 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) { 98 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass(); 99 } 100 101 class RandomDrop : public PacketPipe { 102 public: 103 RandomDrop(double drop_fraction) 104 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {} 105 106 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { 107 if (rand() > drop_fraction_) { 108 pipe_->Send(packet.Pass()); 109 } 110 } 111 112 private: 113 int drop_fraction_; 114 }; 115 116 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) { 117 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass(); 118 } 119 120 class SimpleDelayBase : public PacketPipe { 121 public: 122 SimpleDelayBase() : weak_factory_(this) {} 123 virtual ~SimpleDelayBase() {} 124 125 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { 126 double seconds = GetDelay(); 127 task_runner_->PostDelayedTask( 128 FROM_HERE, 129 base::Bind(&SimpleDelayBase::SendInternal, 130 weak_factory_.GetWeakPtr(), 131 base::Passed(&packet)), 132 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6))); 133 } 134 protected: 135 virtual double GetDelay() = 0; 136 137 private: 138 virtual void SendInternal(scoped_ptr<transport::Packet> packet) { 139 pipe_->Send(packet.Pass()); 140 } 141 142 base::WeakPtrFactory<SimpleDelayBase> weak_factory_; 143 }; 144 145 class ConstantDelay : public SimpleDelayBase { 146 public: 147 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {} 148 virtual double GetDelay() OVERRIDE { 149 return delay_seconds_; 150 } 151 152 private: 153 double delay_seconds_; 154 }; 155 156 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) { 157 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass(); 158 } 159 160 class RandomUnsortedDelay : public SimpleDelayBase { 161 public: 162 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {} 163 164 virtual double GetDelay() OVERRIDE { 165 return random_delay_ * base::RandDouble(); 166 } 167 168 private: 169 double random_delay_; 170 }; 171 172 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) { 173 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass(); 174 } 175 176 177 class RandomSortedDelay : public PacketPipe { 178 public: 179 RandomSortedDelay(double random_delay, 180 double extra_delay, 181 double seconds_between_extra_delay) 182 : random_delay_(random_delay), 183 extra_delay_(extra_delay), 184 seconds_between_extra_delay_(seconds_between_extra_delay), 185 weak_factory_(this) {} 186 187 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { 188 buffer_.push_back(linked_ptr<transport::Packet>(packet.release())); 189 if (buffer_.size() == 1) { 190 Schedule(); 191 } 192 } 193 virtual void InitOnIOThread( 194 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 195 base::TickClock* clock) OVERRIDE { 196 PacketPipe::InitOnIOThread(task_runner, clock); 197 // As we start the stream, assume that we are in a random 198 // place between two extra delays, thus multiplier = 1.0; 199 ScheduleExtraDelay(1.0); 200 } 201 202 private: 203 void ScheduleExtraDelay(double mult) { 204 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble(); 205 int64 microseconds = static_cast<int64>(seconds * 1E6); 206 task_runner_->PostDelayedTask( 207 FROM_HERE, 208 base::Bind(&RandomSortedDelay::CauseExtraDelay, 209 weak_factory_.GetWeakPtr()), 210 base::TimeDelta::FromMicroseconds(microseconds)); 211 } 212 213 void CauseExtraDelay() { 214 block_until_ = clock_->NowTicks() + 215 base::TimeDelta::FromMicroseconds( 216 static_cast<int64>(extra_delay_ * 1E6)); 217 // An extra delay just happened, wait up to seconds_between_extra_delay_*2 218 // before scheduling another one to make the average equal to 219 // seconds_between_extra_delay_. 220 ScheduleExtraDelay(2.0); 221 } 222 223 void Schedule() { 224 double seconds = base::RandDouble() * random_delay_; 225 base::TimeDelta block_time = block_until_ - base::TimeTicks::Now(); 226 base::TimeDelta delay_time = 227 base::TimeDelta::FromMicroseconds( 228 static_cast<int64>(seconds * 1E6)); 229 if (block_time > delay_time) { 230 block_time = delay_time; 231 } 232 233 task_runner_->PostDelayedTask(FROM_HERE, 234 base::Bind(&RandomSortedDelay::ProcessBuffer, 235 weak_factory_.GetWeakPtr()), 236 delay_time); 237 } 238 239 void ProcessBuffer() { 240 CHECK(!buffer_.empty()); 241 scoped_ptr<transport::Packet> packet(buffer_.front().release()); 242 pipe_->Send(packet.Pass()); 243 buffer_.pop_front(); 244 if (!buffer_.empty()) { 245 Schedule(); 246 } 247 } 248 249 base::TimeTicks block_until_; 250 std::deque<linked_ptr<transport::Packet> > buffer_; 251 double random_delay_; 252 double extra_delay_; 253 double seconds_between_extra_delay_; 254 base::WeakPtrFactory<RandomSortedDelay> weak_factory_; 255 }; 256 257 scoped_ptr<PacketPipe> NewRandomSortedDelay( 258 double random_delay, 259 double extra_delay, 260 double seconds_between_extra_delay) { 261 return scoped_ptr<PacketPipe>( 262 new RandomSortedDelay( 263 random_delay, extra_delay, seconds_between_extra_delay)) 264 .Pass(); 265 } 266 267 class NetworkGlitchPipe : public PacketPipe { 268 public: 269 NetworkGlitchPipe(double average_work_time, double average_outage_time) 270 : works_(false), 271 max_work_time_(average_work_time * 2), 272 max_outage_time_(average_outage_time * 2), 273 weak_factory_(this) {} 274 275 virtual void InitOnIOThread( 276 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 277 base::TickClock* clock) OVERRIDE { 278 PacketPipe::InitOnIOThread(task_runner, clock); 279 Flip(); 280 } 281 282 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE { 283 if (works_) { 284 pipe_->Send(packet.Pass()); 285 } 286 } 287 288 private: 289 void Flip() { 290 works_ = !works_; 291 double seconds = base::RandDouble() * 292 (works_ ? max_work_time_ : max_outage_time_); 293 int64 microseconds = static_cast<int64>(seconds * 1E6); 294 task_runner_->PostDelayedTask( 295 FROM_HERE, 296 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()), 297 base::TimeDelta::FromMicroseconds(microseconds)); 298 } 299 300 bool works_; 301 double max_work_time_; 302 double max_outage_time_; 303 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; 304 }; 305 306 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, 307 double average_outage_time) { 308 return scoped_ptr<PacketPipe>( 309 new NetworkGlitchPipe(average_work_time, average_outage_time)) 310 .Pass(); 311 } 312 313 class UDPProxyImpl; 314 315 class PacketSender : public PacketPipe { 316 public: 317 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) 318 : udp_proxy_(udp_proxy), destination_(destination) {} 319 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE; 320 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { 321 NOTREACHED(); 322 } 323 324 private: 325 UDPProxyImpl* udp_proxy_; 326 const net::IPEndPoint* destination_; // not owned 327 }; 328 329 namespace { 330 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { 331 if (*pipe) { 332 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); 333 } else { 334 pipe->reset(next); 335 } 336 } 337 } // namespace 338 339 scoped_ptr<PacketPipe> WifiNetwork() { 340 // This represents the buffer on the sender. 341 scoped_ptr<PacketPipe> pipe; 342 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 343 BuildPipe(&pipe, new RandomDrop(0.005)); 344 // This represents the buffer on the router. 345 BuildPipe(&pipe, new ConstantDelay(1E-3)); 346 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3)); 347 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 348 BuildPipe(&pipe, new ConstantDelay(1E-3)); 349 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3)); 350 BuildPipe(&pipe, new RandomDrop(0.005)); 351 // This represents the buffer on the receiving device. 352 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 353 return pipe.Pass(); 354 } 355 356 scoped_ptr<PacketPipe> BadNetwork() { 357 scoped_ptr<PacketPipe> pipe; 358 // This represents the buffer on the sender. 359 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s 360 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop 361 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1)); 362 // This represents the buffer on the router. 363 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s 364 BuildPipe(&pipe, new ConstantDelay(1E-3)); 365 // Random 40ms every other second 366 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1)); 367 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3)); 368 // This represents the buffer on the receiving device. 369 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s 370 return pipe.Pass(); 371 } 372 373 374 scoped_ptr<PacketPipe> EvilNetwork() { 375 // This represents the buffer on the sender. 376 scoped_ptr<PacketPipe> pipe; 377 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s 378 // This represents the buffer on the router. 379 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop 380 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1)); 381 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 382 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop 383 BuildPipe(&pipe, new ConstantDelay(1E-3)); 384 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3)); 385 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3)); 386 // This represents the buffer on the receiving device. 387 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 388 return pipe.Pass(); 389 } 390 391 class UDPProxyImpl : public UDPProxy { 392 public: 393 UDPProxyImpl(const net::IPEndPoint& local_port, 394 const net::IPEndPoint& destination, 395 scoped_ptr<PacketPipe> to_dest_pipe, 396 scoped_ptr<PacketPipe> from_dest_pipe, 397 net::NetLog* net_log) 398 : local_port_(local_port), 399 destination_(destination), 400 destination_is_mutable_(destination.address().empty()), 401 proxy_thread_("media::cast::test::UdpProxy Thread"), 402 to_dest_pipe_(to_dest_pipe.Pass()), 403 from_dest_pipe_(from_dest_pipe.Pass()), 404 blocked_(false), 405 weak_factory_(this) { 406 proxy_thread_.StartWithOptions( 407 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 408 base::WaitableEvent start_event(false, false); 409 proxy_thread_.message_loop_proxy()->PostTask( 410 FROM_HERE, 411 base::Bind(&UDPProxyImpl::Start, 412 base::Unretained(this), 413 base::Unretained(&start_event), 414 net_log)); 415 start_event.Wait(); 416 } 417 418 virtual ~UDPProxyImpl() { 419 base::WaitableEvent stop_event(false, false); 420 proxy_thread_.message_loop_proxy()->PostTask( 421 FROM_HERE, 422 base::Bind(&UDPProxyImpl::Stop, 423 base::Unretained(this), 424 base::Unretained(&stop_event))); 425 stop_event.Wait(); 426 proxy_thread_.Stop(); 427 } 428 429 void Send(scoped_ptr<transport::Packet> packet, 430 const net::IPEndPoint& destination) { 431 if (blocked_) { 432 LOG(ERROR) << "Cannot write packet right now: blocked"; 433 return; 434 } 435 436 VLOG(1) << "Sending packet, len = " << packet->size(); 437 // We ignore all problems, callbacks and errors. 438 // If it didn't work we just drop the packet at and call it a day. 439 scoped_refptr<net::IOBuffer> buf = 440 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front())); 441 size_t buf_size = packet->size(); 442 int result; 443 if (destination.address().empty()) { 444 VLOG(1) << "Destination has not been set yet."; 445 result = net::ERR_INVALID_ARGUMENT; 446 } else { 447 VLOG(1) << "Destination:" << destination.ToString(); 448 result = socket_->SendTo(buf, 449 static_cast<int>(buf_size), 450 destination, 451 base::Bind(&UDPProxyImpl::AllowWrite, 452 weak_factory_.GetWeakPtr(), 453 buf, 454 base::Passed(&packet))); 455 } 456 if (result == net::ERR_IO_PENDING) { 457 blocked_ = true; 458 } else if (result < 0) { 459 LOG(ERROR) << "Failed to write packet."; 460 } 461 } 462 463 private: 464 void Start(base::WaitableEvent* start_event, 465 net::NetLog* net_log) { 466 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, 467 net::RandIntCallback(), 468 net_log, 469 net::NetLog::Source())); 470 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_)); 471 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_)); 472 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 473 &tick_clock_); 474 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 475 &tick_clock_); 476 477 VLOG(0) << "From:" << local_port_.ToString(); 478 if (!destination_is_mutable_) 479 VLOG(0) << "To:" << destination_.ToString(); 480 481 CHECK_GE(socket_->Bind(local_port_), 0); 482 483 start_event->Signal(); 484 PollRead(); 485 } 486 487 void Stop(base::WaitableEvent* stop_event) { 488 to_dest_pipe_.reset(NULL); 489 from_dest_pipe_.reset(NULL); 490 socket_.reset(NULL); 491 stop_event->Signal(); 492 } 493 494 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { 495 DCHECK_NE(len, net::ERR_IO_PENDING); 496 VLOG(1) << "Got packet, len = " << len; 497 if (len < 0) { 498 LOG(WARNING) << "Socket read error: " << len; 499 return; 500 } 501 packet_->resize(len); 502 if (destination_is_mutable_ && set_destination_next_ && 503 !(recv_address_ == return_address_) && 504 !(recv_address_ == destination_)) { 505 destination_ = recv_address_; 506 } 507 if (recv_address_ == destination_) { 508 set_destination_next_ = false; 509 from_dest_pipe_->Send(packet_.Pass()); 510 } else { 511 set_destination_next_ = true; 512 VLOG(1) << "Return address = " << recv_address_.ToString(); 513 return_address_ = recv_address_; 514 to_dest_pipe_->Send(packet_.Pass()); 515 } 516 } 517 518 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { 519 ProcessPacket(recv_buf, len); 520 PollRead(); 521 } 522 523 void PollRead() { 524 while (true) { 525 packet_.reset(new transport::Packet(kMaxPacketSize)); 526 scoped_refptr<net::IOBuffer> recv_buf = 527 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); 528 int len = socket_->RecvFrom( 529 recv_buf, 530 kMaxPacketSize, 531 &recv_address_, 532 base::Bind(&UDPProxyImpl::ReadCallback, 533 base::Unretained(this), 534 recv_buf)); 535 if (len == net::ERR_IO_PENDING) 536 break; 537 ProcessPacket(recv_buf, len); 538 } 539 } 540 541 void AllowWrite(scoped_refptr<net::IOBuffer> buf, 542 scoped_ptr<transport::Packet> packet, 543 int unused_len) { 544 DCHECK(blocked_); 545 blocked_ = false; 546 } 547 548 // Input 549 net::IPEndPoint local_port_; 550 551 net::IPEndPoint destination_; 552 bool destination_is_mutable_; 553 554 net::IPEndPoint return_address_; 555 bool set_destination_next_; 556 557 base::DefaultTickClock tick_clock_; 558 base::Thread proxy_thread_; 559 scoped_ptr<net::UDPSocket> socket_; 560 scoped_ptr<PacketPipe> to_dest_pipe_; 561 scoped_ptr<PacketPipe> from_dest_pipe_; 562 563 // For receiving. 564 net::IPEndPoint recv_address_; 565 scoped_ptr<transport::Packet> packet_; 566 567 // For sending. 568 bool blocked_; 569 570 base::WeakPtrFactory<UDPProxyImpl> weak_factory_; 571 }; 572 573 void PacketSender::Send(scoped_ptr<transport::Packet> packet) { 574 udp_proxy_->Send(packet.Pass(), *destination_); 575 } 576 577 scoped_ptr<UDPProxy> UDPProxy::Create( 578 const net::IPEndPoint& local_port, 579 const net::IPEndPoint& destination, 580 scoped_ptr<PacketPipe> to_dest_pipe, 581 scoped_ptr<PacketPipe> from_dest_pipe, 582 net::NetLog* net_log) { 583 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, 584 destination, 585 to_dest_pipe.Pass(), 586 from_dest_pipe.Pass(), 587 net_log)); 588 return ret.Pass(); 589 } 590 591 } // namespace test 592 } // namespace cast 593 } // namespace media 594