Home | History | Annotate | Download | only in p2p
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/renderer/p2p/ipc_socket_factory.h"
      6 
      7 #include <deque>
      8 
      9 #include "base/compiler_specific.h"
     10 #include "base/debug/trace_event.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "base/message_loop/message_loop_proxy.h"
     13 #include "content/public/renderer/p2p_socket_client_delegate.h"
     14 #include "content/renderer/p2p/host_address_request.h"
     15 #include "content/renderer/p2p/socket_client_impl.h"
     16 #include "content/renderer/p2p/socket_dispatcher.h"
     17 #include "jingle/glue/utils.h"
     18 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
     19 
     20 namespace content {
     21 
     22 namespace {
     23 
     24 bool IsTcpClientSocket(P2PSocketType type) {
     25   return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
     26          (type == P2P_SOCKET_TCP_CLIENT) ||
     27          (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
     28          (type == P2P_SOCKET_SSLTCP_CLIENT) ||
     29          (type == P2P_SOCKET_TLS_CLIENT) ||
     30          (type == P2P_SOCKET_STUN_TLS_CLIENT);
     31 }
     32 
     33 // TODO(miu): This needs tuning.  http://crbug.com/237960
     34 const size_t kMaximumInFlightBytes = 64 * 1024;  // 64 KB
     35 
     36 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
     37 // using P2PSocketClient that works over IPC-channel. It must be used
     38 // on the thread it was created.
     39 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
     40                         public P2PSocketClientDelegate {
     41  public:
     42   IpcPacketSocket();
     43   virtual ~IpcPacketSocket();
     44 
     45   // Always takes ownership of client even if initialization fails.
     46   bool Init(P2PSocketType type, P2PSocketClientImpl* client,
     47             const talk_base::SocketAddress& local_address,
     48             const talk_base::SocketAddress& remote_address);
     49 
     50   // talk_base::AsyncPacketSocket interface.
     51   virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
     52   virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
     53   virtual int Send(const void *pv, size_t cb,
     54                    talk_base::DiffServCodePoint dscp) OVERRIDE;
     55   virtual int SendTo(const void *pv, size_t cb,
     56                      const talk_base::SocketAddress& addr,
     57                      talk_base::DiffServCodePoint dscp) OVERRIDE;
     58   virtual int Close() OVERRIDE;
     59   virtual State GetState() const OVERRIDE;
     60   virtual int GetOption(talk_base::Socket::Option opt, int* value) OVERRIDE;
     61   virtual int SetOption(talk_base::Socket::Option opt, int value) OVERRIDE;
     62   virtual int GetError() const OVERRIDE;
     63   virtual void SetError(int error) OVERRIDE;
     64 
     65   // P2PSocketClientDelegate implementation.
     66   virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE;
     67   virtual void OnIncomingTcpConnection(
     68       const net::IPEndPoint& address,
     69       P2PSocketClient* client) OVERRIDE;
     70   virtual void OnSendComplete() OVERRIDE;
     71   virtual void OnError() OVERRIDE;
     72   virtual void OnDataReceived(const net::IPEndPoint& address,
     73                               const std::vector<char>& data,
     74                               const base::TimeTicks& timestamp) OVERRIDE;
     75 
     76  private:
     77   enum InternalState {
     78     IS_UNINITIALIZED,
     79     IS_OPENING,
     80     IS_OPEN,
     81     IS_CLOSED,
     82     IS_ERROR,
     83   };
     84 
     85   // Update trace of send throttling internal state. This should be called
     86   // immediately after any changes to |send_bytes_available_| and/or
     87   // |in_flight_packet_sizes_|.
     88   void TraceSendThrottlingState() const;
     89 
     90   void InitAcceptedTcp(P2PSocketClient* client,
     91                        const talk_base::SocketAddress& local_address,
     92                        const talk_base::SocketAddress& remote_address);
     93   P2PSocketType type_;
     94 
     95   // Message loop on which this socket was created and being used.
     96   base::MessageLoop* message_loop_;
     97 
     98   // Corresponding P2P socket client.
     99   scoped_refptr<P2PSocketClient> client_;
    100 
    101   // Local address is allocated by the browser process, and the
    102   // renderer side doesn't know the address until it receives OnOpen()
    103   // event from the browser.
    104   talk_base::SocketAddress local_address_;
    105 
    106   // Remote address for client TCP connections.
    107   talk_base::SocketAddress remote_address_;
    108 
    109   // Current state of the object.
    110   InternalState state_;
    111 
    112   // Track the number of bytes allowed to be sent non-blocking. This is used to
    113   // throttle the sending of packets to the browser process. For each packet
    114   // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
    115   // from the browser process) are made, the value is increased back. This
    116   // allows short bursts of high-rate sending without dropping packets, but
    117   // quickly restricts the client to a sustainable steady-state rate.
    118   size_t send_bytes_available_;
    119   std::deque<size_t> in_flight_packet_sizes_;
    120 
    121   // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
    122   // caller expects SignalWritable notification.
    123   bool writable_signal_expected_;
    124 
    125   // Current error code. Valid when state_ == IS_ERROR.
    126   int error_;
    127 
    128   DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
    129 };
    130 
    131 IpcPacketSocket::IpcPacketSocket()
    132     : type_(P2P_SOCKET_UDP),
    133       message_loop_(base::MessageLoop::current()),
    134       state_(IS_UNINITIALIZED),
    135       send_bytes_available_(kMaximumInFlightBytes),
    136       writable_signal_expected_(false),
    137       error_(0) {
    138   COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
    139 }
    140 
    141 IpcPacketSocket::~IpcPacketSocket() {
    142   if (state_ == IS_OPENING || state_ == IS_OPEN ||
    143       state_ == IS_ERROR) {
    144     Close();
    145   }
    146 }
    147 
    148 void IpcPacketSocket::TraceSendThrottlingState() const {
    149   TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
    150                     send_bytes_available_);
    151   TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
    152                     in_flight_packet_sizes_.size());
    153 }
    154 
    155 bool IpcPacketSocket::Init(P2PSocketType type,
    156                            P2PSocketClientImpl* client,
    157                            const talk_base::SocketAddress& local_address,
    158                            const talk_base::SocketAddress& remote_address) {
    159   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    160   DCHECK_EQ(state_, IS_UNINITIALIZED);
    161 
    162   type_ = type;
    163   client_ = client;
    164   local_address_ = local_address;
    165   remote_address_ = remote_address;
    166   state_ = IS_OPENING;
    167 
    168   net::IPEndPoint local_endpoint;
    169   if (!jingle_glue::SocketAddressToIPEndPoint(
    170           local_address, &local_endpoint)) {
    171     return false;
    172   }
    173 
    174   net::IPEndPoint remote_endpoint;
    175   if (!remote_address.IsNil() &&
    176       !jingle_glue::SocketAddressToIPEndPoint(
    177           remote_address, &remote_endpoint)) {
    178     return false;
    179   }
    180 
    181   client->Init(type, local_endpoint, remote_endpoint, this);
    182 
    183   return true;
    184 }
    185 
    186 void IpcPacketSocket::InitAcceptedTcp(
    187     P2PSocketClient* client,
    188     const talk_base::SocketAddress& local_address,
    189     const talk_base::SocketAddress& remote_address) {
    190   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    191   DCHECK_EQ(state_, IS_UNINITIALIZED);
    192 
    193   client_ = client;
    194   local_address_ = local_address;
    195   remote_address_ = remote_address;
    196   state_ = IS_OPEN;
    197   TraceSendThrottlingState();
    198   client_->SetDelegate(this);
    199 }
    200 
    201 // talk_base::AsyncPacketSocket interface.
    202 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
    203   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    204   return local_address_;
    205 }
    206 
    207 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
    208   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    209   return remote_address_;
    210 }
    211 
    212 int IpcPacketSocket::Send(const void *data, size_t data_size,
    213                           talk_base::DiffServCodePoint dscp) {
    214   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    215   return SendTo(data, data_size, remote_address_, dscp);
    216 }
    217 
    218 int IpcPacketSocket::SendTo(const void *data, size_t data_size,
    219                             const talk_base::SocketAddress& address,
    220                             talk_base::DiffServCodePoint dscp) {
    221   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    222 
    223   switch (state_) {
    224     case IS_UNINITIALIZED:
    225       NOTREACHED();
    226       return EWOULDBLOCK;
    227     case IS_OPENING:
    228       return EWOULDBLOCK;
    229     case IS_CLOSED:
    230       return ENOTCONN;
    231     case IS_ERROR:
    232       return error_;
    233     case IS_OPEN:
    234       // Continue sending the packet.
    235       break;
    236   }
    237 
    238   if (data_size == 0) {
    239     NOTREACHED();
    240     return 0;
    241   }
    242 
    243   if (data_size > send_bytes_available_) {
    244     TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
    245                          TRACE_EVENT_SCOPE_THREAD,
    246                          "id",
    247                          client_->GetSocketID());
    248     writable_signal_expected_ = true;
    249     error_ = EWOULDBLOCK;
    250     return -1;
    251   }
    252 
    253   net::IPEndPoint address_chrome;
    254   if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
    255     NOTREACHED();
    256     error_ = EINVAL;
    257     return -1;
    258   }
    259 
    260   send_bytes_available_ -= data_size;
    261   in_flight_packet_sizes_.push_back(data_size);
    262   TraceSendThrottlingState();
    263 
    264   const char* data_char = reinterpret_cast<const char*>(data);
    265   std::vector<char> data_vector(data_char, data_char + data_size);
    266   client_->SendWithDscp(address_chrome, data_vector,
    267                         static_cast<net::DiffServCodePoint>(dscp));
    268 
    269   // Fake successful send. The caller ignores result anyway.
    270   return data_size;
    271 }
    272 
    273 int IpcPacketSocket::Close() {
    274   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    275 
    276   client_->Close();
    277   state_ = IS_CLOSED;
    278 
    279   return 0;
    280 }
    281 
    282 talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
    283   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    284 
    285   switch (state_) {
    286     case IS_UNINITIALIZED:
    287       NOTREACHED();
    288       return STATE_CLOSED;
    289 
    290     case IS_OPENING:
    291       return STATE_BINDING;
    292 
    293     case IS_OPEN:
    294       if (IsTcpClientSocket(type_)) {
    295         return STATE_CONNECTED;
    296       } else {
    297         return STATE_BOUND;
    298       }
    299 
    300     case IS_CLOSED:
    301     case IS_ERROR:
    302       return STATE_CLOSED;
    303   }
    304 
    305   NOTREACHED();
    306   return STATE_CLOSED;
    307 }
    308 
    309 int IpcPacketSocket::GetOption(talk_base::Socket::Option opt, int* value) {
    310   // We don't support socket options for IPC sockets.
    311   return -1;
    312 }
    313 
    314 int IpcPacketSocket::SetOption(talk_base::Socket::Option opt, int value) {
    315   // We don't support socket options for IPC sockets.
    316   return -1;
    317 }
    318 
    319 int IpcPacketSocket::GetError() const {
    320   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    321   return error_;
    322 }
    323 
    324 void IpcPacketSocket::SetError(int error) {
    325   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    326   error_ = error;
    327 }
    328 
    329 void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) {
    330   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    331 
    332   if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
    333     // Always expect correct IPv4 address to be allocated.
    334     NOTREACHED();
    335     OnError();
    336     return;
    337   }
    338 
    339   state_ = IS_OPEN;
    340   TraceSendThrottlingState();
    341 
    342   SignalAddressReady(this, local_address_);
    343   if (IsTcpClientSocket(type_))
    344     SignalConnect(this);
    345 }
    346 
    347 void IpcPacketSocket::OnIncomingTcpConnection(
    348     const net::IPEndPoint& address,
    349     P2PSocketClient* client) {
    350   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    351 
    352   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    353 
    354   talk_base::SocketAddress remote_address;
    355   if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
    356     // Always expect correct IPv4 address to be allocated.
    357     NOTREACHED();
    358   }
    359   socket->InitAcceptedTcp(client, local_address_, remote_address);
    360   SignalNewConnection(this, socket.release());
    361 }
    362 
    363 void IpcPacketSocket::OnSendComplete() {
    364   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    365 
    366   CHECK(!in_flight_packet_sizes_.empty());
    367   send_bytes_available_ += in_flight_packet_sizes_.front();
    368   DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
    369   in_flight_packet_sizes_.pop_front();
    370   TraceSendThrottlingState();
    371 
    372   if (writable_signal_expected_ && send_bytes_available_ > 0) {
    373     SignalReadyToSend(this);
    374     writable_signal_expected_ = false;
    375   }
    376 }
    377 
    378 void IpcPacketSocket::OnError() {
    379   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    380   state_ = IS_ERROR;
    381   error_ = ECONNABORTED;
    382 }
    383 
    384 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
    385                                      const std::vector<char>& data,
    386                                      const base::TimeTicks& timestamp) {
    387   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    388 
    389   talk_base::SocketAddress address_lj;
    390   if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
    391     // We should always be able to convert address here because we
    392     // don't expect IPv6 address on IPv4 connections.
    393     NOTREACHED();
    394     return;
    395   }
    396 
    397   talk_base::PacketTime packet_time(timestamp.ToInternalValue(), 0);
    398   SignalReadPacket(this, &data[0], data.size(), address_lj,
    399                    packet_time);
    400 }
    401 
    402 }  // namespace
    403 
    404 IpcPacketSocketFactory::IpcPacketSocketFactory(
    405     P2PSocketDispatcher* socket_dispatcher)
    406     : socket_dispatcher_(socket_dispatcher) {
    407 }
    408 
    409 IpcPacketSocketFactory::~IpcPacketSocketFactory() {
    410 }
    411 
    412 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
    413     const talk_base::SocketAddress& local_address, int min_port, int max_port) {
    414   talk_base::SocketAddress crome_address;
    415   P2PSocketClientImpl* socket_client =
    416       new P2PSocketClientImpl(socket_dispatcher_);
    417   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    418   // TODO(sergeyu): Respect local_address and port limits here (need
    419   // to pass them over IPC channel to the browser).
    420   if (!socket->Init(P2P_SOCKET_UDP, socket_client,
    421                     local_address, talk_base::SocketAddress())) {
    422     return NULL;
    423   }
    424   return socket.release();
    425 }
    426 
    427 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
    428     const talk_base::SocketAddress& local_address, int min_port, int max_port,
    429     int opts) {
    430   // TODO(sergeyu): Implement SSL support.
    431   if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP)
    432     return NULL;
    433 
    434   P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    435       P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
    436   P2PSocketClientImpl* socket_client =
    437       new P2PSocketClientImpl(socket_dispatcher_);
    438   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    439   if (!socket->Init(type, socket_client, local_address,
    440                     talk_base::SocketAddress())) {
    441     return NULL;
    442   }
    443   return socket.release();
    444 }
    445 
    446 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
    447     const talk_base::SocketAddress& local_address,
    448     const talk_base::SocketAddress& remote_address,
    449     const talk_base::ProxyInfo& proxy_info,
    450     const std::string& user_agent, int opts) {
    451   P2PSocketType type;
    452   if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) {
    453     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    454         P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
    455   } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) {
    456     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    457         P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
    458   } else {
    459     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    460         P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
    461   }
    462   P2PSocketClientImpl* socket_client =
    463       new P2PSocketClientImpl(socket_dispatcher_);
    464   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    465   if (!socket->Init(type, socket_client, local_address, remote_address))
    466     return NULL;
    467   return socket.release();
    468 }
    469 
    470 talk_base::AsyncResolverInterface*
    471 IpcPacketSocketFactory::CreateAsyncResolver() {
    472   NOTREACHED();
    473   return NULL;
    474 }
    475 
    476 }  // namespace content
    477