1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <math.h> 6 #include <stdlib.h> 7 #include <vector> 8 9 #include "media/cast/test/utility/udp_proxy.h" 10 11 #include "base/logging.h" 12 #include "base/rand_util.h" 13 #include "base/synchronization/waitable_event.h" 14 #include "base/threading/thread.h" 15 #include "base/time/default_tick_clock.h" 16 #include "net/base/io_buffer.h" 17 #include "net/base/net_errors.h" 18 #include "net/udp/udp_socket.h" 19 20 namespace media { 21 namespace cast { 22 namespace test { 23 24 const size_t kMaxPacketSize = 65536; 25 26 PacketPipe::PacketPipe() {} 27 PacketPipe::~PacketPipe() {} 28 void PacketPipe::InitOnIOThread( 29 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 30 base::TickClock* clock) { 31 task_runner_ = task_runner; 32 clock_ = clock; 33 if (pipe_) { 34 pipe_->InitOnIOThread(task_runner, clock); 35 } 36 } 37 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) { 38 if (pipe_) { 39 pipe_->AppendToPipe(pipe.Pass()); 40 } else { 41 pipe_ = pipe.Pass(); 42 } 43 } 44 45 // Roughly emulates a buffer inside a device. 46 // If the buffer is full, packets are dropped. 47 // Packets are output at a maximum bandwidth. 48 class Buffer : public PacketPipe { 49 public: 50 Buffer(size_t buffer_size, double max_megabits_per_second) 51 : buffer_size_(0), 52 max_buffer_size_(buffer_size), 53 max_megabits_per_second_(max_megabits_per_second), 54 weak_factory_(this) { 55 CHECK_GT(max_buffer_size_, 0UL); 56 CHECK_GT(max_megabits_per_second, 0); 57 } 58 59 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 60 if (packet->size() + buffer_size_ <= max_buffer_size_) { 61 buffer_size_ += packet->size(); 62 buffer_.push_back(linked_ptr<Packet>(packet.release())); 63 if (buffer_.size() == 1) { 64 Schedule(); 65 } 66 } 67 } 68 69 private: 70 void Schedule() { 71 last_schedule_ = clock_->NowTicks(); 72 double megabits = buffer_.front()->size() * 8 / 1000000.0; 73 double seconds = megabits / max_megabits_per_second_; 74 int64 microseconds = static_cast<int64>(seconds * 1E6); 75 task_runner_->PostDelayedTask( 76 FROM_HERE, 77 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()), 78 base::TimeDelta::FromMicroseconds(microseconds)); 79 } 80 81 void ProcessBuffer() { 82 int64 bytes_to_send = static_cast<int64>( 83 (clock_->NowTicks() - last_schedule_).InSecondsF() * 84 max_megabits_per_second_ * 1E6 / 8); 85 if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) { 86 bytes_to_send = buffer_.front()->size(); 87 } 88 while (!buffer_.empty() && 89 static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) { 90 CHECK(!buffer_.empty()); 91 scoped_ptr<Packet> packet(buffer_.front().release()); 92 bytes_to_send -= packet->size(); 93 buffer_size_ -= packet->size(); 94 buffer_.pop_front(); 95 pipe_->Send(packet.Pass()); 96 } 97 if (!buffer_.empty()) { 98 Schedule(); 99 } 100 } 101 102 std::deque<linked_ptr<Packet> > buffer_; 103 base::TimeTicks last_schedule_; 104 size_t buffer_size_; 105 size_t max_buffer_size_; 106 double max_megabits_per_second_; // megabits per second 107 base::WeakPtrFactory<Buffer> weak_factory_; 108 }; 109 110 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) { 111 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass(); 112 } 113 114 class RandomDrop : public PacketPipe { 115 public: 116 RandomDrop(double drop_fraction) 117 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {} 118 119 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 120 if (rand() > drop_fraction_) { 121 pipe_->Send(packet.Pass()); 122 } 123 } 124 125 private: 126 int drop_fraction_; 127 }; 128 129 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) { 130 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass(); 131 } 132 133 class SimpleDelayBase : public PacketPipe { 134 public: 135 SimpleDelayBase() : weak_factory_(this) {} 136 virtual ~SimpleDelayBase() {} 137 138 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 139 double seconds = GetDelay(); 140 task_runner_->PostDelayedTask( 141 FROM_HERE, 142 base::Bind(&SimpleDelayBase::SendInternal, 143 weak_factory_.GetWeakPtr(), 144 base::Passed(&packet)), 145 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6))); 146 } 147 protected: 148 virtual double GetDelay() = 0; 149 150 private: 151 virtual void SendInternal(scoped_ptr<Packet> packet) { 152 pipe_->Send(packet.Pass()); 153 } 154 155 base::WeakPtrFactory<SimpleDelayBase> weak_factory_; 156 }; 157 158 class ConstantDelay : public SimpleDelayBase { 159 public: 160 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {} 161 virtual double GetDelay() OVERRIDE { 162 return delay_seconds_; 163 } 164 165 private: 166 double delay_seconds_; 167 }; 168 169 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) { 170 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass(); 171 } 172 173 class RandomUnsortedDelay : public SimpleDelayBase { 174 public: 175 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {} 176 177 virtual double GetDelay() OVERRIDE { 178 return random_delay_ * base::RandDouble(); 179 } 180 181 private: 182 double random_delay_; 183 }; 184 185 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) { 186 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass(); 187 } 188 189 class DuplicateAndDelay : public RandomUnsortedDelay { 190 public: 191 DuplicateAndDelay(double delay_min, 192 double random_delay) : 193 RandomUnsortedDelay(random_delay), 194 delay_min_(delay_min) { 195 } 196 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 197 pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get()))); 198 RandomUnsortedDelay::Send(packet.Pass()); 199 } 200 virtual double GetDelay() OVERRIDE { 201 return RandomUnsortedDelay::GetDelay() + delay_min_; 202 } 203 private: 204 double delay_min_; 205 }; 206 207 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min, 208 double random_delay) { 209 return scoped_ptr<PacketPipe>( 210 new DuplicateAndDelay(delay_min, random_delay)).Pass(); 211 } 212 213 class RandomSortedDelay : public PacketPipe { 214 public: 215 RandomSortedDelay(double random_delay, 216 double extra_delay, 217 double seconds_between_extra_delay) 218 : random_delay_(random_delay), 219 extra_delay_(extra_delay), 220 seconds_between_extra_delay_(seconds_between_extra_delay), 221 weak_factory_(this) {} 222 223 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 224 buffer_.push_back(linked_ptr<Packet>(packet.release())); 225 if (buffer_.size() == 1) { 226 next_send_ = std::max( 227 clock_->NowTicks() + 228 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_), 229 next_send_); 230 ProcessBuffer(); 231 } 232 } 233 virtual void InitOnIOThread( 234 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 235 base::TickClock* clock) OVERRIDE { 236 PacketPipe::InitOnIOThread(task_runner, clock); 237 // As we start the stream, assume that we are in a random 238 // place between two extra delays, thus multiplier = 1.0; 239 ScheduleExtraDelay(1.0); 240 } 241 242 private: 243 void ScheduleExtraDelay(double mult) { 244 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble(); 245 int64 microseconds = static_cast<int64>(seconds * 1E6); 246 task_runner_->PostDelayedTask( 247 FROM_HERE, 248 base::Bind(&RandomSortedDelay::CauseExtraDelay, 249 weak_factory_.GetWeakPtr()), 250 base::TimeDelta::FromMicroseconds(microseconds)); 251 } 252 253 void CauseExtraDelay() { 254 next_send_ = std::max<base::TimeTicks>( 255 clock_->NowTicks() + 256 base::TimeDelta::FromMicroseconds( 257 static_cast<int64>(extra_delay_ * 1E6)), 258 next_send_); 259 // An extra delay just happened, wait up to seconds_between_extra_delay_*2 260 // before scheduling another one to make the average equal to 261 // seconds_between_extra_delay_. 262 ScheduleExtraDelay(2.0); 263 } 264 265 void ProcessBuffer() { 266 base::TimeTicks now = clock_->NowTicks(); 267 while (!buffer_.empty() && next_send_ <= now) { 268 scoped_ptr<Packet> packet(buffer_.front().release()); 269 pipe_->Send(packet.Pass()); 270 buffer_.pop_front(); 271 272 next_send_ += base::TimeDelta::FromSecondsD( 273 base::RandDouble() * random_delay_); 274 } 275 276 if (!buffer_.empty()) { 277 task_runner_->PostDelayedTask( 278 FROM_HERE, 279 base::Bind(&RandomSortedDelay::ProcessBuffer, 280 weak_factory_.GetWeakPtr()), 281 next_send_ - now); 282 } 283 } 284 285 base::TimeTicks block_until_; 286 std::deque<linked_ptr<Packet> > buffer_; 287 double random_delay_; 288 double extra_delay_; 289 double seconds_between_extra_delay_; 290 base::WeakPtrFactory<RandomSortedDelay> weak_factory_; 291 base::TimeTicks next_send_; 292 }; 293 294 scoped_ptr<PacketPipe> NewRandomSortedDelay( 295 double random_delay, 296 double extra_delay, 297 double seconds_between_extra_delay) { 298 return scoped_ptr<PacketPipe>( 299 new RandomSortedDelay( 300 random_delay, extra_delay, seconds_between_extra_delay)) 301 .Pass(); 302 } 303 304 class NetworkGlitchPipe : public PacketPipe { 305 public: 306 NetworkGlitchPipe(double average_work_time, double average_outage_time) 307 : works_(false), 308 max_work_time_(average_work_time * 2), 309 max_outage_time_(average_outage_time * 2), 310 weak_factory_(this) {} 311 312 virtual void InitOnIOThread( 313 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 314 base::TickClock* clock) OVERRIDE { 315 PacketPipe::InitOnIOThread(task_runner, clock); 316 Flip(); 317 } 318 319 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 320 if (works_) { 321 pipe_->Send(packet.Pass()); 322 } 323 } 324 325 private: 326 void Flip() { 327 works_ = !works_; 328 double seconds = base::RandDouble() * 329 (works_ ? max_work_time_ : max_outage_time_); 330 int64 microseconds = static_cast<int64>(seconds * 1E6); 331 task_runner_->PostDelayedTask( 332 FROM_HERE, 333 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()), 334 base::TimeDelta::FromMicroseconds(microseconds)); 335 } 336 337 bool works_; 338 double max_work_time_; 339 double max_outage_time_; 340 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_; 341 }; 342 343 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time, 344 double average_outage_time) { 345 return scoped_ptr<PacketPipe>( 346 new NetworkGlitchPipe(average_work_time, average_outage_time)) 347 .Pass(); 348 } 349 350 351 // Internal buffer object for a client of the IPP model. 352 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe { 353 public: 354 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp, 355 size_t size) 356 : ipp_(ipp), 357 stored_size_(0), 358 stored_limit_(size), 359 clock_(NULL), 360 weak_factory_(this) { 361 } 362 363 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE { 364 // Drop if buffer is full. 365 if (stored_size_ >= stored_limit_) 366 return; 367 stored_size_ += packet->size(); 368 buffer_.push_back(linked_ptr<Packet>(packet.release())); 369 buffer_time_.push_back(clock_->NowTicks()); 370 DCHECK(buffer_.size() == buffer_time_.size()); 371 } 372 373 virtual void InitOnIOThread( 374 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 375 base::TickClock* clock) OVERRIDE { 376 clock_ = clock; 377 if (ipp_) 378 ipp_->InitOnIOThread(task_runner, clock); 379 PacketPipe::InitOnIOThread(task_runner, clock); 380 } 381 382 void SendOnePacket() { 383 scoped_ptr<Packet> packet(buffer_.front().release()); 384 stored_size_ -= packet->size(); 385 buffer_.pop_front(); 386 buffer_time_.pop_front(); 387 pipe_->Send(packet.Pass()); 388 DCHECK(buffer_.size() == buffer_time_.size()); 389 } 390 391 bool Empty() const { 392 return buffer_.empty(); 393 } 394 395 base::TimeTicks FirstPacketTime() const { 396 DCHECK(!buffer_time_.empty()); 397 return buffer_time_.front(); 398 } 399 400 base::WeakPtr<InternalBuffer> GetWeakPtr() { 401 return weak_factory_.GetWeakPtr(); 402 403 } 404 405 private: 406 const base::WeakPtr<InterruptedPoissonProcess> ipp_; 407 size_t stored_size_; 408 const size_t stored_limit_; 409 std::deque<linked_ptr<Packet> > buffer_; 410 std::deque<base::TimeTicks> buffer_time_; 411 base::TickClock* clock_; 412 base::WeakPtrFactory<InternalBuffer> weak_factory_; 413 414 DISALLOW_COPY_AND_ASSIGN(InternalBuffer); 415 }; 416 417 InterruptedPoissonProcess::InterruptedPoissonProcess( 418 const std::vector<double>& average_rates, 419 double coef_burstiness, 420 double coef_variance, 421 uint32 rand_seed) 422 : clock_(NULL), 423 average_rates_(average_rates), 424 coef_burstiness_(coef_burstiness), 425 coef_variance_(coef_variance), 426 rate_index_(0), 427 on_state_(true), 428 weak_factory_(this) { 429 mt_rand_.init_genrand(rand_seed); 430 DCHECK(!average_rates.empty()); 431 ComputeRates(); 432 } 433 434 InterruptedPoissonProcess::~InterruptedPoissonProcess() { 435 } 436 437 void InterruptedPoissonProcess::InitOnIOThread( 438 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 439 base::TickClock* clock) { 440 // Already initialized and started. 441 if (task_runner_.get() && clock_) 442 return; 443 task_runner_ = task_runner; 444 clock_ = clock; 445 UpdateRates(); 446 SwitchOn(); 447 SendPacket(); 448 } 449 450 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) { 451 scoped_ptr<InternalBuffer> buffer( 452 new InternalBuffer(weak_factory_.GetWeakPtr(), size)); 453 send_buffers_.push_back(buffer->GetWeakPtr()); 454 return buffer.PassAs<PacketPipe>(); 455 } 456 457 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) { 458 // Rate is per milliseconds. 459 // The time until next event is exponentially distributed to the 460 // inverse of |rate|. 461 return base::TimeDelta::FromMillisecondsD( 462 fabs(-log(1.0 - RandDouble()) / rate)); 463 } 464 465 double InterruptedPoissonProcess::RandDouble() { 466 // Generate a 64-bits random number from MT19937 and then convert 467 // it to double. 468 uint64 rand = mt_rand_.genrand_int32(); 469 rand <<= 32; 470 rand |= mt_rand_.genrand_int32(); 471 return base::BitsToOpenEndedUnitInterval(rand); 472 } 473 474 void InterruptedPoissonProcess::ComputeRates() { 475 double avg_rate = average_rates_[rate_index_]; 476 477 send_rate_ = avg_rate / coef_burstiness_; 478 switch_off_rate_ = 479 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) / 480 coef_burstiness_ / (coef_variance_ - 1); 481 switch_on_rate_ = 482 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1); 483 } 484 485 void InterruptedPoissonProcess::UpdateRates() { 486 ComputeRates(); 487 488 // Rates are updated once per second. 489 rate_index_ = (rate_index_ + 1) % average_rates_.size(); 490 task_runner_->PostDelayedTask( 491 FROM_HERE, 492 base::Bind(&InterruptedPoissonProcess::UpdateRates, 493 weak_factory_.GetWeakPtr()), 494 base::TimeDelta::FromSeconds(1)); 495 } 496 497 void InterruptedPoissonProcess::SwitchOff() { 498 on_state_ = false; 499 task_runner_->PostDelayedTask( 500 FROM_HERE, 501 base::Bind(&InterruptedPoissonProcess::SwitchOn, 502 weak_factory_.GetWeakPtr()), 503 NextEvent(switch_on_rate_)); 504 } 505 506 void InterruptedPoissonProcess::SwitchOn() { 507 on_state_ = true; 508 task_runner_->PostDelayedTask( 509 FROM_HERE, 510 base::Bind(&InterruptedPoissonProcess::SwitchOff, 511 weak_factory_.GetWeakPtr()), 512 NextEvent(switch_off_rate_)); 513 } 514 515 void InterruptedPoissonProcess::SendPacket() { 516 task_runner_->PostDelayedTask( 517 FROM_HERE, 518 base::Bind(&InterruptedPoissonProcess::SendPacket, 519 weak_factory_.GetWeakPtr()), 520 NextEvent(send_rate_)); 521 522 // If OFF then don't send. 523 if (!on_state_) 524 return; 525 526 // Find the earliest packet to send. 527 base::TimeTicks earliest_time; 528 for (size_t i = 0; i < send_buffers_.size(); ++i) { 529 if (!send_buffers_[i]) 530 continue; 531 if (send_buffers_[i]->Empty()) 532 continue; 533 if (earliest_time.is_null() || 534 send_buffers_[i]->FirstPacketTime() < earliest_time) 535 earliest_time = send_buffers_[i]->FirstPacketTime(); 536 } 537 for (size_t i = 0; i < send_buffers_.size(); ++i) { 538 if (!send_buffers_[i]) 539 continue; 540 if (send_buffers_[i]->Empty()) 541 continue; 542 if (send_buffers_[i]->FirstPacketTime() != earliest_time) 543 continue; 544 send_buffers_[i]->SendOnePacket(); 545 break; 546 } 547 } 548 549 class UDPProxyImpl; 550 551 class PacketSender : public PacketPipe { 552 public: 553 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination) 554 : udp_proxy_(udp_proxy), destination_(destination) {} 555 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE; 556 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE { 557 NOTREACHED(); 558 } 559 560 private: 561 UDPProxyImpl* udp_proxy_; 562 const net::IPEndPoint* destination_; // not owned 563 }; 564 565 namespace { 566 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) { 567 if (*pipe) { 568 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass()); 569 } else { 570 pipe->reset(next); 571 } 572 } 573 } // namespace 574 575 scoped_ptr<PacketPipe> GoodNetwork() { 576 // This represents the buffer on the sender. 577 scoped_ptr<PacketPipe> pipe; 578 BuildPipe(&pipe, new Buffer(2 << 20, 50)); 579 BuildPipe(&pipe, new ConstantDelay(1E-3)); 580 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3)); 581 // This represents the buffer on the receiving device. 582 BuildPipe(&pipe, new Buffer(2 << 20, 50)); 583 return pipe.Pass(); 584 } 585 586 scoped_ptr<PacketPipe> WifiNetwork() { 587 // This represents the buffer on the sender. 588 scoped_ptr<PacketPipe> pipe; 589 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 590 BuildPipe(&pipe, new RandomDrop(0.005)); 591 // This represents the buffer on the router. 592 BuildPipe(&pipe, new ConstantDelay(1E-3)); 593 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3)); 594 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 595 BuildPipe(&pipe, new ConstantDelay(1E-3)); 596 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3)); 597 BuildPipe(&pipe, new RandomDrop(0.005)); 598 // This represents the buffer on the receiving device. 599 BuildPipe(&pipe, new Buffer(256 << 10, 20)); 600 return pipe.Pass(); 601 } 602 603 scoped_ptr<PacketPipe> BadNetwork() { 604 scoped_ptr<PacketPipe> pipe; 605 // This represents the buffer on the sender. 606 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s 607 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop 608 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1)); 609 // This represents the buffer on the router. 610 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s 611 BuildPipe(&pipe, new ConstantDelay(1E-3)); 612 // Random 40ms every other second 613 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1)); 614 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3)); 615 // This represents the buffer on the receiving device. 616 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s 617 return pipe.Pass(); 618 } 619 620 621 scoped_ptr<PacketPipe> EvilNetwork() { 622 // This represents the buffer on the sender. 623 scoped_ptr<PacketPipe> pipe; 624 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s 625 // This represents the buffer on the router. 626 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop 627 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1)); 628 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 629 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop 630 BuildPipe(&pipe, new ConstantDelay(1E-3)); 631 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3)); 632 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3)); 633 // This represents the buffer on the receiving device. 634 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s 635 return pipe.Pass(); 636 } 637 638 class UDPProxyImpl : public UDPProxy { 639 public: 640 UDPProxyImpl(const net::IPEndPoint& local_port, 641 const net::IPEndPoint& destination, 642 scoped_ptr<PacketPipe> to_dest_pipe, 643 scoped_ptr<PacketPipe> from_dest_pipe, 644 net::NetLog* net_log) 645 : local_port_(local_port), 646 destination_(destination), 647 destination_is_mutable_(destination.address().empty()), 648 proxy_thread_("media::cast::test::UdpProxy Thread"), 649 to_dest_pipe_(to_dest_pipe.Pass()), 650 from_dest_pipe_(from_dest_pipe.Pass()), 651 blocked_(false), 652 weak_factory_(this) { 653 proxy_thread_.StartWithOptions( 654 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 655 base::WaitableEvent start_event(false, false); 656 proxy_thread_.message_loop_proxy()->PostTask( 657 FROM_HERE, 658 base::Bind(&UDPProxyImpl::Start, 659 base::Unretained(this), 660 base::Unretained(&start_event), 661 net_log)); 662 start_event.Wait(); 663 } 664 665 virtual ~UDPProxyImpl() { 666 base::WaitableEvent stop_event(false, false); 667 proxy_thread_.message_loop_proxy()->PostTask( 668 FROM_HERE, 669 base::Bind(&UDPProxyImpl::Stop, 670 base::Unretained(this), 671 base::Unretained(&stop_event))); 672 stop_event.Wait(); 673 proxy_thread_.Stop(); 674 } 675 676 void Send(scoped_ptr<Packet> packet, 677 const net::IPEndPoint& destination) { 678 if (blocked_) { 679 LOG(ERROR) << "Cannot write packet right now: blocked"; 680 return; 681 } 682 683 VLOG(1) << "Sending packet, len = " << packet->size(); 684 // We ignore all problems, callbacks and errors. 685 // If it didn't work we just drop the packet at and call it a day. 686 scoped_refptr<net::IOBuffer> buf = 687 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front())); 688 size_t buf_size = packet->size(); 689 int result; 690 if (destination.address().empty()) { 691 VLOG(1) << "Destination has not been set yet."; 692 result = net::ERR_INVALID_ARGUMENT; 693 } else { 694 VLOG(1) << "Destination:" << destination.ToString(); 695 result = socket_->SendTo(buf.get(), 696 static_cast<int>(buf_size), 697 destination, 698 base::Bind(&UDPProxyImpl::AllowWrite, 699 weak_factory_.GetWeakPtr(), 700 buf, 701 base::Passed(&packet))); 702 } 703 if (result == net::ERR_IO_PENDING) { 704 blocked_ = true; 705 } else if (result < 0) { 706 LOG(ERROR) << "Failed to write packet."; 707 } 708 } 709 710 private: 711 void Start(base::WaitableEvent* start_event, 712 net::NetLog* net_log) { 713 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, 714 net::RandIntCallback(), 715 net_log, 716 net::NetLog::Source())); 717 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_)); 718 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_)); 719 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 720 &tick_clock_); 721 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(), 722 &tick_clock_); 723 724 VLOG(0) << "From:" << local_port_.ToString(); 725 if (!destination_is_mutable_) 726 VLOG(0) << "To:" << destination_.ToString(); 727 728 CHECK_GE(socket_->Bind(local_port_), 0); 729 730 start_event->Signal(); 731 PollRead(); 732 } 733 734 void Stop(base::WaitableEvent* stop_event) { 735 to_dest_pipe_.reset(NULL); 736 from_dest_pipe_.reset(NULL); 737 socket_.reset(NULL); 738 stop_event->Signal(); 739 } 740 741 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) { 742 DCHECK_NE(len, net::ERR_IO_PENDING); 743 VLOG(1) << "Got packet, len = " << len; 744 if (len < 0) { 745 LOG(WARNING) << "Socket read error: " << len; 746 return; 747 } 748 packet_->resize(len); 749 if (destination_is_mutable_ && set_destination_next_ && 750 !(recv_address_ == return_address_) && 751 !(recv_address_ == destination_)) { 752 destination_ = recv_address_; 753 } 754 if (recv_address_ == destination_) { 755 set_destination_next_ = false; 756 from_dest_pipe_->Send(packet_.Pass()); 757 } else { 758 set_destination_next_ = true; 759 VLOG(1) << "Return address = " << recv_address_.ToString(); 760 return_address_ = recv_address_; 761 to_dest_pipe_->Send(packet_.Pass()); 762 } 763 } 764 765 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) { 766 ProcessPacket(recv_buf, len); 767 PollRead(); 768 } 769 770 void PollRead() { 771 while (true) { 772 packet_.reset(new Packet(kMaxPacketSize)); 773 scoped_refptr<net::IOBuffer> recv_buf = 774 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front())); 775 int len = socket_->RecvFrom( 776 recv_buf.get(), 777 kMaxPacketSize, 778 &recv_address_, 779 base::Bind( 780 &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf)); 781 if (len == net::ERR_IO_PENDING) 782 break; 783 ProcessPacket(recv_buf, len); 784 } 785 } 786 787 void AllowWrite(scoped_refptr<net::IOBuffer> buf, 788 scoped_ptr<Packet> packet, 789 int unused_len) { 790 DCHECK(blocked_); 791 blocked_ = false; 792 } 793 794 // Input 795 net::IPEndPoint local_port_; 796 797 net::IPEndPoint destination_; 798 bool destination_is_mutable_; 799 800 net::IPEndPoint return_address_; 801 bool set_destination_next_; 802 803 base::DefaultTickClock tick_clock_; 804 base::Thread proxy_thread_; 805 scoped_ptr<net::UDPSocket> socket_; 806 scoped_ptr<PacketPipe> to_dest_pipe_; 807 scoped_ptr<PacketPipe> from_dest_pipe_; 808 809 // For receiving. 810 net::IPEndPoint recv_address_; 811 scoped_ptr<Packet> packet_; 812 813 // For sending. 814 bool blocked_; 815 816 base::WeakPtrFactory<UDPProxyImpl> weak_factory_; 817 }; 818 819 void PacketSender::Send(scoped_ptr<Packet> packet) { 820 udp_proxy_->Send(packet.Pass(), *destination_); 821 } 822 823 scoped_ptr<UDPProxy> UDPProxy::Create( 824 const net::IPEndPoint& local_port, 825 const net::IPEndPoint& destination, 826 scoped_ptr<PacketPipe> to_dest_pipe, 827 scoped_ptr<PacketPipe> from_dest_pipe, 828 net::NetLog* net_log) { 829 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port, 830 destination, 831 to_dest_pipe.Pass(), 832 from_dest_pipe.Pass(), 833 net_log)); 834 return ret.Pass(); 835 } 836 837 } // namespace test 838 } // namespace cast 839 } // namespace media 840