1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/cast/transport/transport/udp_transport.h" 6 7 #include <algorithm> 8 #include <string> 9 10 #include "base/bind.h" 11 #include "base/logging.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/memory/scoped_ptr.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/rand_util.h" 16 #include "net/base/io_buffer.h" 17 #include "net/base/net_errors.h" 18 #include "net/base/rand_callback.h" 19 20 namespace media { 21 namespace cast { 22 namespace transport { 23 24 namespace { 25 const int kMaxPacketSize = 1500; 26 27 bool IsEmpty(const net::IPEndPoint& addr) { 28 net::IPAddressNumber empty_addr(addr.address().size()); 29 return std::equal( 30 empty_addr.begin(), empty_addr.end(), addr.address().begin()) && 31 !addr.port(); 32 } 33 34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) { 35 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(), 36 addr1.address().end(), 37 addr2.address().begin()); 38 } 39 } // namespace 40 41 UdpTransport::UdpTransport( 42 net::NetLog* net_log, 43 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy, 44 const net::IPEndPoint& local_end_point, 45 const net::IPEndPoint& remote_end_point, 46 const CastTransportStatusCallback& status_callback) 47 : io_thread_proxy_(io_thread_proxy), 48 local_addr_(local_end_point), 49 remote_addr_(remote_end_point), 50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND, 51 net::RandIntCallback(), 52 net_log, 53 net::NetLog::Source())), 54 send_pending_(false), 55 receive_pending_(false), 56 client_connected_(false), 57 next_dscp_value_(net::DSCP_NO_CHANGE), 58 status_callback_(status_callback), 59 weak_factory_(this) { 60 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point)); 61 } 62 63 UdpTransport::~UdpTransport() {} 64 65 void UdpTransport::StartReceiving( 66 const PacketReceiverCallback& packet_receiver) { 67 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 68 69 packet_receiver_ = packet_receiver; 70 udp_socket_->AllowAddressReuse(); 71 udp_socket_->SetMulticastLoopbackMode(true); 72 if (!IsEmpty(local_addr_)) { 73 if (udp_socket_->Bind(local_addr_) < 0) { 74 status_callback_.Run(TRANSPORT_SOCKET_ERROR); 75 LOG(ERROR) << "Failed to bind local address."; 76 return; 77 } 78 } else if (!IsEmpty(remote_addr_)) { 79 if (udp_socket_->Connect(remote_addr_) < 0) { 80 status_callback_.Run(TRANSPORT_SOCKET_ERROR); 81 LOG(ERROR) << "Failed to connect to remote address."; 82 return; 83 } 84 client_connected_ = true; 85 } else { 86 NOTREACHED() << "Either local or remote address has to be defined."; 87 } 88 89 ScheduleReceiveNextPacket(); 90 } 91 92 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) { 93 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 94 next_dscp_value_ = dscp; 95 } 96 97 void UdpTransport::ScheduleReceiveNextPacket() { 98 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 99 if (!packet_receiver_.is_null() && !receive_pending_) { 100 receive_pending_ = true; 101 io_thread_proxy_->PostTask(FROM_HERE, 102 base::Bind(&UdpTransport::ReceiveNextPacket, 103 weak_factory_.GetWeakPtr(), 104 net::ERR_IO_PENDING)); 105 } 106 } 107 108 void UdpTransport::ReceiveNextPacket(int length_or_status) { 109 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 110 111 // Loop while UdpSocket is delivering data synchronously. When it responds 112 // with a "pending" status, break and expect this method to be called back in 113 // the future when a packet is ready. 114 while (true) { 115 if (length_or_status == net::ERR_IO_PENDING) { 116 next_packet_.reset(new Packet(kMaxPacketSize)); 117 recv_buf_ = new net::WrappedIOBuffer( 118 reinterpret_cast<char*>(&next_packet_->front())); 119 length_or_status = udp_socket_->RecvFrom( 120 recv_buf_, 121 kMaxPacketSize, 122 &recv_addr_, 123 base::Bind(&UdpTransport::ReceiveNextPacket, 124 weak_factory_.GetWeakPtr())); 125 if (length_or_status == net::ERR_IO_PENDING) { 126 receive_pending_ = true; 127 return; 128 } 129 } 130 131 // Note: At this point, either a packet is ready or an error has occurred. 132 if (length_or_status < 0) { 133 VLOG(1) << "Failed to receive packet: Status code is " 134 << length_or_status; 135 status_callback_.Run(TRANSPORT_SOCKET_ERROR); 136 receive_pending_ = false; 137 return; 138 } 139 140 // Confirm the packet has come from the expected remote address; otherwise, 141 // ignore it. If this is the first packet being received and no remote 142 // address has been set, set the remote address and expect all future 143 // packets to come from the same one. 144 // TODO(hubbe): We should only do this if the caller used a valid ssrc. 145 if (IsEmpty(remote_addr_)) { 146 remote_addr_ = recv_addr_; 147 VLOG(1) << "Setting remote address from first received packet: " 148 << remote_addr_.ToString(); 149 } else if (!IsEqual(remote_addr_, recv_addr_)) { 150 VLOG(1) << "Ignoring packet received from an unrecognized address: " 151 << recv_addr_.ToString() << "."; 152 length_or_status = net::ERR_IO_PENDING; 153 continue; 154 } 155 156 next_packet_->resize(length_or_status); 157 packet_receiver_.Run(next_packet_.Pass()); 158 length_or_status = net::ERR_IO_PENDING; 159 } 160 } 161 162 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) { 163 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 164 165 DCHECK(!send_pending_); 166 if (send_pending_) { 167 VLOG(1) << "Cannot send because of pending IO."; 168 return true; 169 } 170 171 if (next_dscp_value_ != net::DSCP_NO_CHANGE) { 172 int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_); 173 if (result != net::OK) { 174 LOG(ERROR) << "Unable to set DSCP: " << next_dscp_value_ 175 << " to socket; Error: " << result; 176 } 177 // Don't change DSCP in next send. 178 next_dscp_value_ = net::DSCP_NO_CHANGE; 179 } 180 181 scoped_refptr<net::IOBuffer> buf = 182 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front())); 183 184 int result; 185 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent, 186 weak_factory_.GetWeakPtr(), 187 buf, 188 packet, 189 cb); 190 if (client_connected_) { 191 // If we called Connect() before we must call Write() instead of 192 // SendTo(). Otherwise on some platforms we might get 193 // ERR_SOCKET_IS_CONNECTED. 194 result = udp_socket_->Write(buf, 195 static_cast<int>(packet->data.size()), 196 callback); 197 } else if (!IsEmpty(remote_addr_)) { 198 result = udp_socket_->SendTo(buf, 199 static_cast<int>(packet->data.size()), 200 remote_addr_, 201 callback); 202 } else { 203 return true; 204 } 205 206 if (result == net::ERR_IO_PENDING) { 207 send_pending_ = true; 208 return false; 209 } else if (result < 0) { 210 LOG(ERROR) << "Failed to send packet: " << result << "."; 211 status_callback_.Run(TRANSPORT_SOCKET_ERROR); 212 return true; 213 } else { 214 // Successful send, re-start reading if needed. 215 ScheduleReceiveNextPacket(); 216 return true; 217 } 218 } 219 220 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, 221 PacketRef packet, 222 const base::Closure& cb, 223 int result) { 224 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); 225 226 send_pending_ = false; 227 if (result < 0) { 228 LOG(ERROR) << "Failed to send packet: " << result << "."; 229 status_callback_.Run(TRANSPORT_SOCKET_ERROR); 230 } else { 231 // Successful send, re-start reading if needed. 232 ScheduleReceiveNextPacket(); 233 } 234 235 if (!cb.is_null()) { 236 cb.Run(); 237 } 238 } 239 240 } // namespace transport 241 } // namespace cast 242 } // namespace media 243