Home | History | Annotate | Download | only in transport
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/transport/transport/udp_transport.h"
      6 
      7 #include <algorithm>
      8 #include <string>
      9 
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/memory/ref_counted.h"
     13 #include "base/memory/scoped_ptr.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/rand_util.h"
     16 #include "net/base/io_buffer.h"
     17 #include "net/base/net_errors.h"
     18 #include "net/base/rand_callback.h"
     19 
     20 namespace media {
     21 namespace cast {
     22 namespace transport {
     23 
     24 namespace {
     25 const int kMaxPacketSize = 1500;
     26 
     27 bool IsEmpty(const net::IPEndPoint& addr) {
     28   net::IPAddressNumber empty_addr(addr.address().size());
     29   return std::equal(
     30              empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
     31          !addr.port();
     32 }
     33 
     34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
     35   return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
     36                                                     addr1.address().end(),
     37                                                     addr2.address().begin());
     38 }
     39 }  // namespace
     40 
     41 UdpTransport::UdpTransport(
     42     net::NetLog* net_log,
     43     const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
     44     const net::IPEndPoint& local_end_point,
     45     const net::IPEndPoint& remote_end_point,
     46     const CastTransportStatusCallback& status_callback)
     47     : io_thread_proxy_(io_thread_proxy),
     48       local_addr_(local_end_point),
     49       remote_addr_(remote_end_point),
     50       udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
     51                                      net::RandIntCallback(),
     52                                      net_log,
     53                                      net::NetLog::Source())),
     54       send_pending_(false),
     55       receive_pending_(false),
     56       client_connected_(false),
     57       next_dscp_value_(net::DSCP_NO_CHANGE),
     58       status_callback_(status_callback),
     59       weak_factory_(this) {
     60   DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
     61 }
     62 
     63 UdpTransport::~UdpTransport() {}
     64 
     65 void UdpTransport::StartReceiving(
     66     const PacketReceiverCallback& packet_receiver) {
     67   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     68 
     69   packet_receiver_ = packet_receiver;
     70   udp_socket_->AllowAddressReuse();
     71   udp_socket_->SetMulticastLoopbackMode(true);
     72   if (!IsEmpty(local_addr_)) {
     73     if (udp_socket_->Bind(local_addr_) < 0) {
     74       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
     75       LOG(ERROR) << "Failed to bind local address.";
     76       return;
     77     }
     78   } else if (!IsEmpty(remote_addr_)) {
     79     if (udp_socket_->Connect(remote_addr_) < 0) {
     80       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
     81       LOG(ERROR) << "Failed to connect to remote address.";
     82       return;
     83     }
     84     client_connected_ = true;
     85   } else {
     86     NOTREACHED() << "Either local or remote address has to be defined.";
     87   }
     88 
     89   ScheduleReceiveNextPacket();
     90 }
     91 
     92 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
     93   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     94   next_dscp_value_ = dscp;
     95 }
     96 
     97 void UdpTransport::ScheduleReceiveNextPacket() {
     98   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
     99   if (!packet_receiver_.is_null() && !receive_pending_) {
    100     receive_pending_ = true;
    101     io_thread_proxy_->PostTask(FROM_HERE,
    102                                base::Bind(&UdpTransport::ReceiveNextPacket,
    103                                           weak_factory_.GetWeakPtr(),
    104                                           net::ERR_IO_PENDING));
    105   }
    106 }
    107 
    108 void UdpTransport::ReceiveNextPacket(int length_or_status) {
    109   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
    110 
    111   // Loop while UdpSocket is delivering data synchronously.  When it responds
    112   // with a "pending" status, break and expect this method to be called back in
    113   // the future when a packet is ready.
    114   while (true) {
    115     if (length_or_status == net::ERR_IO_PENDING) {
    116       next_packet_.reset(new Packet(kMaxPacketSize));
    117       recv_buf_ = new net::WrappedIOBuffer(
    118           reinterpret_cast<char*>(&next_packet_->front()));
    119       length_or_status = udp_socket_->RecvFrom(
    120           recv_buf_,
    121           kMaxPacketSize,
    122           &recv_addr_,
    123           base::Bind(&UdpTransport::ReceiveNextPacket,
    124                      weak_factory_.GetWeakPtr()));
    125       if (length_or_status == net::ERR_IO_PENDING) {
    126         receive_pending_ = true;
    127         return;
    128       }
    129     }
    130 
    131     // Note: At this point, either a packet is ready or an error has occurred.
    132     if (length_or_status < 0) {
    133       VLOG(1) << "Failed to receive packet: Status code is "
    134               << length_or_status;
    135       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
    136       receive_pending_ = false;
    137       return;
    138     }
    139 
    140     // Confirm the packet has come from the expected remote address; otherwise,
    141     // ignore it.  If this is the first packet being received and no remote
    142     // address has been set, set the remote address and expect all future
    143     // packets to come from the same one.
    144     // TODO(hubbe): We should only do this if the caller used a valid ssrc.
    145     if (IsEmpty(remote_addr_)) {
    146       remote_addr_ = recv_addr_;
    147       VLOG(1) << "Setting remote address from first received packet: "
    148               << remote_addr_.ToString();
    149     } else if (!IsEqual(remote_addr_, recv_addr_)) {
    150       VLOG(1) << "Ignoring packet received from an unrecognized address: "
    151               << recv_addr_.ToString() << ".";
    152       length_or_status = net::ERR_IO_PENDING;
    153       continue;
    154     }
    155 
    156     next_packet_->resize(length_or_status);
    157     packet_receiver_.Run(next_packet_.Pass());
    158     length_or_status = net::ERR_IO_PENDING;
    159   }
    160 }
    161 
    162 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
    163   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
    164 
    165   DCHECK(!send_pending_);
    166   if (send_pending_) {
    167     VLOG(1) << "Cannot send because of pending IO.";
    168     return true;
    169   }
    170 
    171   if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
    172     int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
    173     if (result != net::OK) {
    174       LOG(ERROR) << "Unable to set DSCP: " << next_dscp_value_
    175                  << " to socket; Error: " << result;
    176     }
    177     // Don't change DSCP in next send.
    178     next_dscp_value_ = net::DSCP_NO_CHANGE;
    179   }
    180 
    181   scoped_refptr<net::IOBuffer> buf =
    182       new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
    183 
    184   int result;
    185   base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
    186                                                   weak_factory_.GetWeakPtr(),
    187                                                   buf,
    188                                                   packet,
    189                                                   cb);
    190   if (client_connected_) {
    191     // If we called Connect() before we must call Write() instead of
    192     // SendTo(). Otherwise on some platforms we might get
    193     // ERR_SOCKET_IS_CONNECTED.
    194     result = udp_socket_->Write(buf,
    195                                 static_cast<int>(packet->data.size()),
    196                                 callback);
    197   } else if (!IsEmpty(remote_addr_)) {
    198     result = udp_socket_->SendTo(buf,
    199                                  static_cast<int>(packet->data.size()),
    200                                  remote_addr_,
    201                                  callback);
    202   } else {
    203     return true;
    204   }
    205 
    206   if (result == net::ERR_IO_PENDING) {
    207     send_pending_ = true;
    208     return false;
    209   } else if (result < 0) {
    210     LOG(ERROR) << "Failed to send packet: " << result << ".";
    211     status_callback_.Run(TRANSPORT_SOCKET_ERROR);
    212     return true;
    213   } else {
    214     // Successful send, re-start reading if needed.
    215     ScheduleReceiveNextPacket();
    216     return true;
    217   }
    218 }
    219 
    220 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
    221                           PacketRef packet,
    222                           const base::Closure& cb,
    223                           int result) {
    224   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
    225 
    226   send_pending_ = false;
    227   if (result < 0) {
    228     LOG(ERROR) << "Failed to send packet: " << result << ".";
    229     status_callback_.Run(TRANSPORT_SOCKET_ERROR);
    230   } else {
    231     // Successful send, re-start reading if needed.
    232     ScheduleReceiveNextPacket();
    233   }
    234 
    235   if (!cb.is_null()) {
    236     cb.Run();
    237   }
    238 }
    239 
    240 }  // namespace transport
    241 }  // namespace cast
    242 }  // namespace media
    243