Home | History | Annotate | Download | only in p2p
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/renderer/p2p/ipc_socket_factory.h"
      6 
      7 #include <deque>
      8 
      9 #include "base/compiler_specific.h"
     10 #include "base/debug/trace_event.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "base/message_loop/message_loop_proxy.h"
     13 #include "content/renderer/p2p/socket_client.h"
     14 #include "content/renderer/p2p/socket_dispatcher.h"
     15 #include "jingle/glue/utils.h"
     16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
     17 
     18 namespace content {
     19 
     20 namespace {
     21 
     22 bool IsTcpClientSocket(P2PSocketType type) {
     23   return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
     24          (type == P2P_SOCKET_TCP_CLIENT) ||
     25          (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
     26          (type == P2P_SOCKET_SSLTCP_CLIENT) ||
     27          (type == P2P_SOCKET_TLS_CLIENT) ||
     28          (type == P2P_SOCKET_STUN_TLS_CLIENT);
     29 }
     30 
     31 // TODO(miu): This needs tuning.  http://crbug.com/237960
     32 const size_t kMaximumInFlightBytes = 64 * 1024;  // 64 KB
     33 
     34 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
     35 // using P2PSocketClient that works over IPC-channel. It must be used
     36 // on the thread it was created.
     37 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
     38                         public P2PSocketClient::Delegate {
     39  public:
     40   IpcPacketSocket();
     41   virtual ~IpcPacketSocket();
     42 
     43   // Always takes ownership of client even if initialization fails.
     44   bool Init(P2PSocketType type, P2PSocketClient* client,
     45             const talk_base::SocketAddress& local_address,
     46             const talk_base::SocketAddress& remote_address);
     47 
     48   // talk_base::AsyncPacketSocket interface.
     49   virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
     50   virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
     51   virtual int Send(const void *pv, size_t cb) OVERRIDE;
     52   virtual int SendTo(const void *pv, size_t cb,
     53                      const talk_base::SocketAddress& addr) OVERRIDE;
     54   virtual int Close() OVERRIDE;
     55   virtual State GetState() const OVERRIDE;
     56   virtual int GetOption(talk_base::Socket::Option opt, int* value) OVERRIDE;
     57   virtual int SetOption(talk_base::Socket::Option opt, int value) OVERRIDE;
     58   virtual int GetError() const OVERRIDE;
     59   virtual void SetError(int error) OVERRIDE;
     60 
     61   // P2PSocketClient::Delegate implementation.
     62   virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE;
     63   virtual void OnIncomingTcpConnection(const net::IPEndPoint& address,
     64                                        P2PSocketClient* client) OVERRIDE;
     65   virtual void OnSendComplete() OVERRIDE;
     66   virtual void OnError() OVERRIDE;
     67   virtual void OnDataReceived(const net::IPEndPoint& address,
     68                               const std::vector<char>& data) OVERRIDE;
     69 
     70  private:
     71   enum InternalState {
     72     IS_UNINITIALIZED,
     73     IS_OPENING,
     74     IS_OPEN,
     75     IS_CLOSED,
     76     IS_ERROR,
     77   };
     78 
     79   // Update trace of send throttling internal state. This should be called
     80   // immediately after any changes to |send_bytes_available_| and/or
     81   // |in_flight_packet_sizes_|.
     82   void TraceSendThrottlingState() const;
     83 
     84   void InitAcceptedTcp(P2PSocketClient* client,
     85                        const talk_base::SocketAddress& local_address,
     86                        const talk_base::SocketAddress& remote_address);
     87   P2PSocketType type_;
     88 
     89   // Message loop on which this socket was created and being used.
     90   base::MessageLoop* message_loop_;
     91 
     92   // Corresponding P2P socket client.
     93   scoped_refptr<P2PSocketClient> client_;
     94 
     95   // Local address is allocated by the browser process, and the
     96   // renderer side doesn't know the address until it receives OnOpen()
     97   // event from the browser.
     98   talk_base::SocketAddress local_address_;
     99 
    100   // Remote address for client TCP connections.
    101   talk_base::SocketAddress remote_address_;
    102 
    103   // Current state of the object.
    104   InternalState state_;
    105 
    106   // Track the number of bytes allowed to be sent non-blocking. This is used to
    107   // throttle the sending of packets to the browser process. For each packet
    108   // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
    109   // from the browser process) are made, the value is increased back. This
    110   // allows short bursts of high-rate sending without dropping packets, but
    111   // quickly restricts the client to a sustainable steady-state rate.
    112   size_t send_bytes_available_;
    113   std::deque<size_t> in_flight_packet_sizes_;
    114 
    115   // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
    116   // caller expects SignalWritable notification.
    117   bool writable_signal_expected_;
    118 
    119   // Current error code. Valid when state_ == IS_ERROR.
    120   int error_;
    121 
    122   DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
    123 };
    124 
    125 IpcPacketSocket::IpcPacketSocket()
    126     : type_(P2P_SOCKET_UDP),
    127       message_loop_(base::MessageLoop::current()),
    128       state_(IS_UNINITIALIZED),
    129       send_bytes_available_(kMaximumInFlightBytes),
    130       writable_signal_expected_(false),
    131       error_(0) {
    132   COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
    133 }
    134 
    135 IpcPacketSocket::~IpcPacketSocket() {
    136   if (state_ == IS_OPENING || state_ == IS_OPEN ||
    137       state_ == IS_ERROR) {
    138     Close();
    139   }
    140 }
    141 
    142 void IpcPacketSocket::TraceSendThrottlingState() const {
    143   TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
    144                     send_bytes_available_);
    145   TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
    146                     in_flight_packet_sizes_.size());
    147 }
    148 
    149 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client,
    150                            const talk_base::SocketAddress& local_address,
    151                            const talk_base::SocketAddress& remote_address) {
    152   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    153   DCHECK_EQ(state_, IS_UNINITIALIZED);
    154 
    155   type_ = type;
    156   client_ = client;
    157   local_address_ = local_address;
    158   remote_address_ = remote_address;
    159   state_ = IS_OPENING;
    160 
    161   net::IPEndPoint local_endpoint;
    162   if (!jingle_glue::SocketAddressToIPEndPoint(
    163           local_address, &local_endpoint)) {
    164     return false;
    165   }
    166 
    167   net::IPEndPoint remote_endpoint;
    168   if (!remote_address.IsNil() &&
    169       !jingle_glue::SocketAddressToIPEndPoint(
    170           remote_address, &remote_endpoint)) {
    171     return false;
    172   }
    173 
    174   client_->Init(type, local_endpoint, remote_endpoint, this);
    175 
    176   return true;
    177 }
    178 
    179 void IpcPacketSocket::InitAcceptedTcp(
    180     P2PSocketClient* client,
    181     const talk_base::SocketAddress& local_address,
    182     const talk_base::SocketAddress& remote_address) {
    183   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    184   DCHECK_EQ(state_, IS_UNINITIALIZED);
    185 
    186   client_ = client;
    187   local_address_ = local_address;
    188   remote_address_ = remote_address;
    189   state_ = IS_OPEN;
    190   TraceSendThrottlingState();
    191   client_->set_delegate(this);
    192 }
    193 
    194 // talk_base::AsyncPacketSocket interface.
    195 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
    196   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    197   return local_address_;
    198 }
    199 
    200 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
    201   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    202   return remote_address_;
    203 }
    204 
    205 int IpcPacketSocket::Send(const void *data, size_t data_size) {
    206   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    207   return SendTo(data, data_size, remote_address_);
    208 }
    209 
    210 int IpcPacketSocket::SendTo(const void *data, size_t data_size,
    211                             const talk_base::SocketAddress& address) {
    212   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    213 
    214   switch (state_) {
    215     case IS_UNINITIALIZED:
    216       NOTREACHED();
    217       return EWOULDBLOCK;
    218     case IS_OPENING:
    219       return EWOULDBLOCK;
    220     case IS_CLOSED:
    221       return ENOTCONN;
    222     case IS_ERROR:
    223       return error_;
    224     case IS_OPEN:
    225       // Continue sending the packet.
    226       break;
    227   }
    228 
    229   if (data_size == 0) {
    230     NOTREACHED();
    231     return 0;
    232   }
    233 
    234   if (data_size > send_bytes_available_) {
    235     TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
    236                          TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id());
    237     writable_signal_expected_ = true;
    238     error_ = EWOULDBLOCK;
    239     return -1;
    240   }
    241 
    242   net::IPEndPoint address_chrome;
    243   if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
    244     NOTREACHED();
    245     error_ = EINVAL;
    246     return -1;
    247   }
    248 
    249   send_bytes_available_ -= data_size;
    250   in_flight_packet_sizes_.push_back(data_size);
    251   TraceSendThrottlingState();
    252 
    253   const char* data_char = reinterpret_cast<const char*>(data);
    254   std::vector<char> data_vector(data_char, data_char + data_size);
    255   client_->Send(address_chrome, data_vector);
    256 
    257   // Fake successful send. The caller ignores result anyway.
    258   return data_size;
    259 }
    260 
    261 int IpcPacketSocket::Close() {
    262   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    263 
    264   client_->Close();
    265   state_ = IS_CLOSED;
    266 
    267   return 0;
    268 }
    269 
    270 talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
    271   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    272 
    273   switch (state_) {
    274     case IS_UNINITIALIZED:
    275       NOTREACHED();
    276       return STATE_CLOSED;
    277 
    278     case IS_OPENING:
    279       return STATE_BINDING;
    280 
    281     case IS_OPEN:
    282       if (IsTcpClientSocket(type_)) {
    283         return STATE_CONNECTED;
    284       } else {
    285         return STATE_BOUND;
    286       }
    287 
    288     case IS_CLOSED:
    289     case IS_ERROR:
    290       return STATE_CLOSED;
    291   }
    292 
    293   NOTREACHED();
    294   return STATE_CLOSED;
    295 }
    296 
    297 int IpcPacketSocket::GetOption(talk_base::Socket::Option opt, int* value) {
    298   // We don't support socket options for IPC sockets.
    299   return -1;
    300 }
    301 
    302 int IpcPacketSocket::SetOption(talk_base::Socket::Option opt, int value) {
    303   // We don't support socket options for IPC sockets.
    304   return -1;
    305 }
    306 
    307 int IpcPacketSocket::GetError() const {
    308   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    309   return error_;
    310 }
    311 
    312 void IpcPacketSocket::SetError(int error) {
    313   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    314   error_ = error;
    315 }
    316 
    317 void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) {
    318   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    319 
    320   if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
    321     // Always expect correct IPv4 address to be allocated.
    322     NOTREACHED();
    323     OnError();
    324     return;
    325   }
    326 
    327   state_ = IS_OPEN;
    328   TraceSendThrottlingState();
    329 
    330   SignalAddressReady(this, local_address_);
    331   if (IsTcpClientSocket(type_))
    332     SignalConnect(this);
    333 }
    334 
    335 void IpcPacketSocket::OnIncomingTcpConnection(
    336     const net::IPEndPoint& address,
    337     P2PSocketClient* client) {
    338   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    339 
    340   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    341 
    342   talk_base::SocketAddress remote_address;
    343   if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
    344     // Always expect correct IPv4 address to be allocated.
    345     NOTREACHED();
    346   }
    347   socket->InitAcceptedTcp(client, local_address_, remote_address);
    348   SignalNewConnection(this, socket.release());
    349 }
    350 
    351 void IpcPacketSocket::OnSendComplete() {
    352   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    353 
    354   CHECK(!in_flight_packet_sizes_.empty());
    355   send_bytes_available_ += in_flight_packet_sizes_.front();
    356   DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
    357   in_flight_packet_sizes_.pop_front();
    358   TraceSendThrottlingState();
    359 
    360   if (writable_signal_expected_ && send_bytes_available_ > 0) {
    361     SignalReadyToSend(this);
    362     writable_signal_expected_ = false;
    363   }
    364 }
    365 
    366 void IpcPacketSocket::OnError() {
    367   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    368   state_ = IS_ERROR;
    369   error_ = ECONNABORTED;
    370 }
    371 
    372 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
    373                                      const std::vector<char>& data) {
    374   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
    375 
    376   talk_base::SocketAddress address_lj;
    377   if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
    378     // We should always be able to convert address here because we
    379     // don't expect IPv6 address on IPv4 connections.
    380     NOTREACHED();
    381     return;
    382   }
    383 
    384   SignalReadPacket(this, &data[0], data.size(), address_lj);
    385 }
    386 
    387 }  // namespace
    388 
    389 IpcPacketSocketFactory::IpcPacketSocketFactory(
    390     P2PSocketDispatcher* socket_dispatcher)
    391     : socket_dispatcher_(socket_dispatcher) {
    392 }
    393 
    394 IpcPacketSocketFactory::~IpcPacketSocketFactory() {
    395 }
    396 
    397 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
    398     const talk_base::SocketAddress& local_address, int min_port, int max_port) {
    399   talk_base::SocketAddress crome_address;
    400   P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
    401   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    402   // TODO(sergeyu): Respect local_address and port limits here (need
    403   // to pass them over IPC channel to the browser).
    404   if (!socket->Init(P2P_SOCKET_UDP, socket_client,
    405                     local_address, talk_base::SocketAddress())) {
    406     return NULL;
    407   }
    408   return socket.release();
    409 }
    410 
    411 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
    412     const talk_base::SocketAddress& local_address, int min_port, int max_port,
    413     int opts) {
    414   // TODO(sergeyu): Implement SSL support.
    415   if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP)
    416     return NULL;
    417 
    418   P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    419       P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
    420   P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
    421   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    422   if (!socket->Init(type, socket_client, local_address,
    423                     talk_base::SocketAddress())) {
    424     return NULL;
    425   }
    426   return socket.release();
    427 }
    428 
    429 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
    430     const talk_base::SocketAddress& local_address,
    431     const talk_base::SocketAddress& remote_address,
    432     const talk_base::ProxyInfo& proxy_info,
    433     const std::string& user_agent, int opts) {
    434   P2PSocketType type;
    435   if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) {
    436     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    437         P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
    438   } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) {
    439     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    440         P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
    441   } else {
    442     type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
    443         P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
    444   }
    445   P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
    446   scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
    447   if (!socket->Init(type, socket_client, local_address,
    448                     remote_address))
    449     return NULL;
    450   return socket.release();
    451 }
    452 
    453 }  // namespace content
    454