1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/renderer/p2p/ipc_socket_factory.h" 6 7 #include <deque> 8 9 #include "base/compiler_specific.h" 10 #include "base/debug/trace_event.h" 11 #include "base/message_loop/message_loop.h" 12 #include "base/message_loop/message_loop_proxy.h" 13 #include "content/public/renderer/p2p_socket_client_delegate.h" 14 #include "content/renderer/p2p/host_address_request.h" 15 #include "content/renderer/p2p/socket_client_impl.h" 16 #include "content/renderer/p2p/socket_dispatcher.h" 17 #include "jingle/glue/utils.h" 18 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" 19 20 namespace content { 21 22 namespace { 23 24 bool IsTcpClientSocket(P2PSocketType type) { 25 return (type == P2P_SOCKET_STUN_TCP_CLIENT) || 26 (type == P2P_SOCKET_TCP_CLIENT) || 27 (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) || 28 (type == P2P_SOCKET_SSLTCP_CLIENT) || 29 (type == P2P_SOCKET_TLS_CLIENT) || 30 (type == P2P_SOCKET_STUN_TLS_CLIENT); 31 } 32 33 // TODO(miu): This needs tuning. http://crbug.com/237960 34 const size_t kMaximumInFlightBytes = 64 * 1024; // 64 KB 35 36 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface 37 // using P2PSocketClient that works over IPC-channel. It must be used 38 // on the thread it was created. 39 class IpcPacketSocket : public talk_base::AsyncPacketSocket, 40 public P2PSocketClientDelegate { 41 public: 42 IpcPacketSocket(); 43 virtual ~IpcPacketSocket(); 44 45 // Always takes ownership of client even if initialization fails. 46 bool Init(P2PSocketType type, P2PSocketClientImpl* client, 47 const talk_base::SocketAddress& local_address, 48 const talk_base::SocketAddress& remote_address); 49 50 // talk_base::AsyncPacketSocket interface. 51 virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE; 52 virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE; 53 virtual int Send(const void *pv, size_t cb, 54 talk_base::DiffServCodePoint dscp) OVERRIDE; 55 virtual int SendTo(const void *pv, size_t cb, 56 const talk_base::SocketAddress& addr, 57 talk_base::DiffServCodePoint dscp) OVERRIDE; 58 virtual int Close() OVERRIDE; 59 virtual State GetState() const OVERRIDE; 60 virtual int GetOption(talk_base::Socket::Option opt, int* value) OVERRIDE; 61 virtual int SetOption(talk_base::Socket::Option opt, int value) OVERRIDE; 62 virtual int GetError() const OVERRIDE; 63 virtual void SetError(int error) OVERRIDE; 64 65 // P2PSocketClientDelegate implementation. 66 virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE; 67 virtual void OnIncomingTcpConnection( 68 const net::IPEndPoint& address, 69 P2PSocketClient* client) OVERRIDE; 70 virtual void OnSendComplete() OVERRIDE; 71 virtual void OnError() OVERRIDE; 72 virtual void OnDataReceived(const net::IPEndPoint& address, 73 const std::vector<char>& data, 74 const base::TimeTicks& timestamp) OVERRIDE; 75 76 private: 77 enum InternalState { 78 IS_UNINITIALIZED, 79 IS_OPENING, 80 IS_OPEN, 81 IS_CLOSED, 82 IS_ERROR, 83 }; 84 85 // Update trace of send throttling internal state. This should be called 86 // immediately after any changes to |send_bytes_available_| and/or 87 // |in_flight_packet_sizes_|. 88 void TraceSendThrottlingState() const; 89 90 void InitAcceptedTcp(P2PSocketClient* client, 91 const talk_base::SocketAddress& local_address, 92 const talk_base::SocketAddress& remote_address); 93 P2PSocketType type_; 94 95 // Message loop on which this socket was created and being used. 96 base::MessageLoop* message_loop_; 97 98 // Corresponding P2P socket client. 99 scoped_refptr<P2PSocketClient> client_; 100 101 // Local address is allocated by the browser process, and the 102 // renderer side doesn't know the address until it receives OnOpen() 103 // event from the browser. 104 talk_base::SocketAddress local_address_; 105 106 // Remote address for client TCP connections. 107 talk_base::SocketAddress remote_address_; 108 109 // Current state of the object. 110 InternalState state_; 111 112 // Track the number of bytes allowed to be sent non-blocking. This is used to 113 // throttle the sending of packets to the browser process. For each packet 114 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs 115 // from the browser process) are made, the value is increased back. This 116 // allows short bursts of high-rate sending without dropping packets, but 117 // quickly restricts the client to a sustainable steady-state rate. 118 size_t send_bytes_available_; 119 std::deque<size_t> in_flight_packet_sizes_; 120 121 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 122 // caller expects SignalWritable notification. 123 bool writable_signal_expected_; 124 125 // Current error code. Valid when state_ == IS_ERROR. 126 int error_; 127 128 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); 129 }; 130 131 IpcPacketSocket::IpcPacketSocket() 132 : type_(P2P_SOCKET_UDP), 133 message_loop_(base::MessageLoop::current()), 134 state_(IS_UNINITIALIZED), 135 send_bytes_available_(kMaximumInFlightBytes), 136 writable_signal_expected_(false), 137 error_(0) { 138 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); 139 } 140 141 IpcPacketSocket::~IpcPacketSocket() { 142 if (state_ == IS_OPENING || state_ == IS_OPEN || 143 state_ == IS_ERROR) { 144 Close(); 145 } 146 } 147 148 void IpcPacketSocket::TraceSendThrottlingState() const { 149 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(), 150 send_bytes_available_); 151 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(), 152 in_flight_packet_sizes_.size()); 153 } 154 155 bool IpcPacketSocket::Init(P2PSocketType type, 156 P2PSocketClientImpl* client, 157 const talk_base::SocketAddress& local_address, 158 const talk_base::SocketAddress& remote_address) { 159 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 160 DCHECK_EQ(state_, IS_UNINITIALIZED); 161 162 type_ = type; 163 client_ = client; 164 local_address_ = local_address; 165 remote_address_ = remote_address; 166 state_ = IS_OPENING; 167 168 net::IPEndPoint local_endpoint; 169 if (!jingle_glue::SocketAddressToIPEndPoint( 170 local_address, &local_endpoint)) { 171 return false; 172 } 173 174 net::IPEndPoint remote_endpoint; 175 if (!remote_address.IsNil() && 176 !jingle_glue::SocketAddressToIPEndPoint( 177 remote_address, &remote_endpoint)) { 178 return false; 179 } 180 181 client->Init(type, local_endpoint, remote_endpoint, this); 182 183 return true; 184 } 185 186 void IpcPacketSocket::InitAcceptedTcp( 187 P2PSocketClient* client, 188 const talk_base::SocketAddress& local_address, 189 const talk_base::SocketAddress& remote_address) { 190 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 191 DCHECK_EQ(state_, IS_UNINITIALIZED); 192 193 client_ = client; 194 local_address_ = local_address; 195 remote_address_ = remote_address; 196 state_ = IS_OPEN; 197 TraceSendThrottlingState(); 198 client_->SetDelegate(this); 199 } 200 201 // talk_base::AsyncPacketSocket interface. 202 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { 203 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 204 return local_address_; 205 } 206 207 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { 208 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 209 return remote_address_; 210 } 211 212 int IpcPacketSocket::Send(const void *data, size_t data_size, 213 talk_base::DiffServCodePoint dscp) { 214 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 215 return SendTo(data, data_size, remote_address_, dscp); 216 } 217 218 int IpcPacketSocket::SendTo(const void *data, size_t data_size, 219 const talk_base::SocketAddress& address, 220 talk_base::DiffServCodePoint dscp) { 221 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 222 223 switch (state_) { 224 case IS_UNINITIALIZED: 225 NOTREACHED(); 226 return EWOULDBLOCK; 227 case IS_OPENING: 228 return EWOULDBLOCK; 229 case IS_CLOSED: 230 return ENOTCONN; 231 case IS_ERROR: 232 return error_; 233 case IS_OPEN: 234 // Continue sending the packet. 235 break; 236 } 237 238 if (data_size == 0) { 239 NOTREACHED(); 240 return 0; 241 } 242 243 if (data_size > send_bytes_available_) { 244 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", 245 TRACE_EVENT_SCOPE_THREAD, 246 "id", 247 client_->GetSocketID()); 248 writable_signal_expected_ = true; 249 error_ = EWOULDBLOCK; 250 return -1; 251 } 252 253 net::IPEndPoint address_chrome; 254 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 255 NOTREACHED(); 256 error_ = EINVAL; 257 return -1; 258 } 259 260 send_bytes_available_ -= data_size; 261 in_flight_packet_sizes_.push_back(data_size); 262 TraceSendThrottlingState(); 263 264 const char* data_char = reinterpret_cast<const char*>(data); 265 std::vector<char> data_vector(data_char, data_char + data_size); 266 client_->SendWithDscp(address_chrome, data_vector, 267 static_cast<net::DiffServCodePoint>(dscp)); 268 269 // Fake successful send. The caller ignores result anyway. 270 return data_size; 271 } 272 273 int IpcPacketSocket::Close() { 274 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 275 276 client_->Close(); 277 state_ = IS_CLOSED; 278 279 return 0; 280 } 281 282 talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const { 283 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 284 285 switch (state_) { 286 case IS_UNINITIALIZED: 287 NOTREACHED(); 288 return STATE_CLOSED; 289 290 case IS_OPENING: 291 return STATE_BINDING; 292 293 case IS_OPEN: 294 if (IsTcpClientSocket(type_)) { 295 return STATE_CONNECTED; 296 } else { 297 return STATE_BOUND; 298 } 299 300 case IS_CLOSED: 301 case IS_ERROR: 302 return STATE_CLOSED; 303 } 304 305 NOTREACHED(); 306 return STATE_CLOSED; 307 } 308 309 int IpcPacketSocket::GetOption(talk_base::Socket::Option opt, int* value) { 310 // We don't support socket options for IPC sockets. 311 return -1; 312 } 313 314 int IpcPacketSocket::SetOption(talk_base::Socket::Option opt, int value) { 315 // We don't support socket options for IPC sockets. 316 return -1; 317 } 318 319 int IpcPacketSocket::GetError() const { 320 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 321 return error_; 322 } 323 324 void IpcPacketSocket::SetError(int error) { 325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 326 error_ = error; 327 } 328 329 void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) { 330 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 331 332 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { 333 // Always expect correct IPv4 address to be allocated. 334 NOTREACHED(); 335 OnError(); 336 return; 337 } 338 339 state_ = IS_OPEN; 340 TraceSendThrottlingState(); 341 342 SignalAddressReady(this, local_address_); 343 if (IsTcpClientSocket(type_)) 344 SignalConnect(this); 345 } 346 347 void IpcPacketSocket::OnIncomingTcpConnection( 348 const net::IPEndPoint& address, 349 P2PSocketClient* client) { 350 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 351 352 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 353 354 talk_base::SocketAddress remote_address; 355 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 356 // Always expect correct IPv4 address to be allocated. 357 NOTREACHED(); 358 } 359 socket->InitAcceptedTcp(client, local_address_, remote_address); 360 SignalNewConnection(this, socket.release()); 361 } 362 363 void IpcPacketSocket::OnSendComplete() { 364 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 365 366 CHECK(!in_flight_packet_sizes_.empty()); 367 send_bytes_available_ += in_flight_packet_sizes_.front(); 368 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes); 369 in_flight_packet_sizes_.pop_front(); 370 TraceSendThrottlingState(); 371 372 if (writable_signal_expected_ && send_bytes_available_ > 0) { 373 SignalReadyToSend(this); 374 writable_signal_expected_ = false; 375 } 376 } 377 378 void IpcPacketSocket::OnError() { 379 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 380 state_ = IS_ERROR; 381 error_ = ECONNABORTED; 382 } 383 384 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address, 385 const std::vector<char>& data, 386 const base::TimeTicks& timestamp) { 387 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 388 389 talk_base::SocketAddress address_lj; 390 if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) { 391 // We should always be able to convert address here because we 392 // don't expect IPv6 address on IPv4 connections. 393 NOTREACHED(); 394 return; 395 } 396 397 talk_base::PacketTime packet_time(timestamp.ToInternalValue(), 0); 398 SignalReadPacket(this, &data[0], data.size(), address_lj, 399 packet_time); 400 } 401 402 } // namespace 403 404 IpcPacketSocketFactory::IpcPacketSocketFactory( 405 P2PSocketDispatcher* socket_dispatcher) 406 : socket_dispatcher_(socket_dispatcher) { 407 } 408 409 IpcPacketSocketFactory::~IpcPacketSocketFactory() { 410 } 411 412 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket( 413 const talk_base::SocketAddress& local_address, int min_port, int max_port) { 414 talk_base::SocketAddress crome_address; 415 P2PSocketClientImpl* socket_client = 416 new P2PSocketClientImpl(socket_dispatcher_); 417 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 418 // TODO(sergeyu): Respect local_address and port limits here (need 419 // to pass them over IPC channel to the browser). 420 if (!socket->Init(P2P_SOCKET_UDP, socket_client, 421 local_address, talk_base::SocketAddress())) { 422 return NULL; 423 } 424 return socket.release(); 425 } 426 427 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket( 428 const talk_base::SocketAddress& local_address, int min_port, int max_port, 429 int opts) { 430 // TODO(sergeyu): Implement SSL support. 431 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) 432 return NULL; 433 434 P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 435 P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER; 436 P2PSocketClientImpl* socket_client = 437 new P2PSocketClientImpl(socket_dispatcher_); 438 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 439 if (!socket->Init(type, socket_client, local_address, 440 talk_base::SocketAddress())) { 441 return NULL; 442 } 443 return socket.release(); 444 } 445 446 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket( 447 const talk_base::SocketAddress& local_address, 448 const talk_base::SocketAddress& remote_address, 449 const talk_base::ProxyInfo& proxy_info, 450 const std::string& user_agent, int opts) { 451 P2PSocketType type; 452 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) { 453 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 454 P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT; 455 } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) { 456 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 457 P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT; 458 } else { 459 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 460 P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT; 461 } 462 P2PSocketClientImpl* socket_client = 463 new P2PSocketClientImpl(socket_dispatcher_); 464 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 465 if (!socket->Init(type, socket_client, local_address, remote_address)) 466 return NULL; 467 return socket.release(); 468 } 469 470 talk_base::AsyncResolverInterface* 471 IpcPacketSocketFactory::CreateAsyncResolver() { 472 NOTREACHED(); 473 return NULL; 474 } 475 476 } // namespace content 477