1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/renderer/p2p/socket_client_impl.h" 6 7 #include "base/bind.h" 8 #include "base/message_loop/message_loop_proxy.h" 9 #include "content/common/p2p_messages.h" 10 #include "content/public/renderer/p2p_socket_client_delegate.h" 11 #include "content/renderer/p2p/socket_dispatcher.h" 12 #include "content/renderer/render_thread_impl.h" 13 #include "crypto/random.h" 14 15 namespace { 16 17 uint64 GetUniqueId(uint32 random_socket_id, uint32 packet_id) { 18 uint64 uid = random_socket_id; 19 uid <<= 32; 20 uid |= packet_id; 21 return uid; 22 } 23 24 } // namespace 25 26 namespace content { 27 28 scoped_refptr<P2PSocketClient> P2PSocketClient::Create( 29 P2PSocketType type, 30 const net::IPEndPoint& local_address, 31 const net::IPEndPoint& remote_address, 32 P2PSocketClientDelegate* delegate) { 33 P2PSocketClientImpl *impl = new P2PSocketClientImpl( 34 RenderThreadImpl::current()->p2p_socket_dispatcher()); 35 impl->Init(type, local_address, remote_address, delegate); 36 return impl; 37 } 38 39 P2PSocketClientImpl::P2PSocketClientImpl(P2PSocketDispatcher* dispatcher) 40 : dispatcher_(dispatcher), 41 ipc_message_loop_(dispatcher->message_loop()), 42 delegate_message_loop_(base::MessageLoopProxy::current()), 43 socket_id_(0), delegate_(NULL), 44 state_(STATE_UNINITIALIZED), 45 random_socket_id_(0), 46 next_packet_id_(0) { 47 crypto::RandBytes(&random_socket_id_, sizeof(random_socket_id_)); 48 } 49 50 P2PSocketClientImpl::~P2PSocketClientImpl() { 51 CHECK(state_ == STATE_CLOSED || state_ == STATE_UNINITIALIZED); 52 } 53 54 void P2PSocketClientImpl::Init( 55 P2PSocketType type, 56 const net::IPEndPoint& local_address, 57 const net::IPEndPoint& remote_address, 58 P2PSocketClientDelegate* delegate) { 59 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 60 // |delegate_| is only accessesed on |delegate_message_loop_|. 61 delegate_ = delegate; 62 63 ipc_message_loop_->PostTask( 64 FROM_HERE, base::Bind(&P2PSocketClientImpl::DoInit, 65 this, 66 type, 67 local_address, 68 remote_address)); 69 } 70 71 void P2PSocketClientImpl::DoInit(P2PSocketType type, 72 const net::IPEndPoint& local_address, 73 const net::IPEndPoint& remote_address) { 74 DCHECK_EQ(state_, STATE_UNINITIALIZED); 75 DCHECK(delegate_); 76 state_ = STATE_OPENING; 77 socket_id_ = dispatcher_->RegisterClient(this); 78 dispatcher_->SendP2PMessage(new P2PHostMsg_CreateSocket( 79 type, socket_id_, local_address, remote_address)); 80 } 81 82 void P2PSocketClientImpl::SendWithDscp( 83 const net::IPEndPoint& address, 84 const std::vector<char>& data, 85 net::DiffServCodePoint dscp) { 86 if (!ipc_message_loop_->BelongsToCurrentThread()) { 87 ipc_message_loop_->PostTask( 88 FROM_HERE, base::Bind( 89 &P2PSocketClientImpl::SendWithDscp, this, address, data, dscp)); 90 return; 91 } 92 93 // Can send data only when the socket is open. 94 DCHECK(state_ == STATE_OPEN || state_ == STATE_ERROR); 95 if (state_ == STATE_OPEN) { 96 uint64 unique_id = GetUniqueId(random_socket_id_, ++next_packet_id_); 97 TRACE_EVENT_ASYNC_BEGIN0("p2p", "Send", unique_id); 98 dispatcher_->SendP2PMessage(new P2PHostMsg_Send(socket_id_, address, data, 99 dscp, unique_id)); 100 } 101 } 102 103 void P2PSocketClientImpl::Send(const net::IPEndPoint& address, 104 const std::vector<char>& data) { 105 SendWithDscp(address, data, net::DSCP_DEFAULT); 106 } 107 108 void P2PSocketClientImpl::Close() { 109 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 110 111 delegate_ = NULL; 112 113 ipc_message_loop_->PostTask( 114 FROM_HERE, base::Bind(&P2PSocketClientImpl::DoClose, this)); 115 } 116 117 void P2PSocketClientImpl::DoClose() { 118 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 119 if (dispatcher_) { 120 if (state_ == STATE_OPEN || state_ == STATE_OPENING || 121 state_ == STATE_ERROR) { 122 dispatcher_->SendP2PMessage(new P2PHostMsg_DestroySocket(socket_id_)); 123 } 124 dispatcher_->UnregisterClient(socket_id_); 125 } 126 127 state_ = STATE_CLOSED; 128 } 129 130 int P2PSocketClientImpl::GetSocketID() const { 131 return socket_id_; 132 } 133 134 void P2PSocketClientImpl::SetDelegate(P2PSocketClientDelegate* delegate) { 135 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 136 delegate_ = delegate; 137 } 138 139 void P2PSocketClientImpl::OnSocketCreated(const net::IPEndPoint& address) { 140 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 141 DCHECK_EQ(state_, STATE_OPENING); 142 state_ = STATE_OPEN; 143 144 delegate_message_loop_->PostTask( 145 FROM_HERE, 146 base::Bind(&P2PSocketClientImpl::DeliverOnSocketCreated, this, address)); 147 } 148 149 void P2PSocketClientImpl::DeliverOnSocketCreated( 150 const net::IPEndPoint& address) { 151 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 152 if (delegate_) 153 delegate_->OnOpen(address); 154 } 155 156 void P2PSocketClientImpl::OnIncomingTcpConnection( 157 const net::IPEndPoint& address) { 158 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 159 DCHECK_EQ(state_, STATE_OPEN); 160 161 scoped_refptr<P2PSocketClientImpl> new_client = 162 new P2PSocketClientImpl(dispatcher_); 163 new_client->socket_id_ = dispatcher_->RegisterClient(new_client.get()); 164 new_client->state_ = STATE_OPEN; 165 new_client->delegate_message_loop_ = delegate_message_loop_; 166 167 dispatcher_->SendP2PMessage(new P2PHostMsg_AcceptIncomingTcpConnection( 168 socket_id_, address, new_client->socket_id_)); 169 170 delegate_message_loop_->PostTask( 171 FROM_HERE, base::Bind( 172 &P2PSocketClientImpl::DeliverOnIncomingTcpConnection, 173 this, address, new_client)); 174 } 175 176 void P2PSocketClientImpl::DeliverOnIncomingTcpConnection( 177 const net::IPEndPoint& address, 178 scoped_refptr<P2PSocketClient> new_client) { 179 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 180 if (delegate_) { 181 delegate_->OnIncomingTcpConnection(address, new_client.get()); 182 } else { 183 // Just close the socket if there is no delegate to accept it. 184 new_client->Close(); 185 } 186 } 187 188 void P2PSocketClientImpl::OnSendComplete() { 189 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 190 191 delegate_message_loop_->PostTask( 192 FROM_HERE, base::Bind(&P2PSocketClientImpl::DeliverOnSendComplete, this)); 193 } 194 195 void P2PSocketClientImpl::DeliverOnSendComplete() { 196 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 197 if (delegate_) 198 delegate_->OnSendComplete(); 199 } 200 201 void P2PSocketClientImpl::OnError() { 202 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 203 state_ = STATE_ERROR; 204 205 delegate_message_loop_->PostTask( 206 FROM_HERE, base::Bind(&P2PSocketClientImpl::DeliverOnError, this)); 207 } 208 209 void P2PSocketClientImpl::DeliverOnError() { 210 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 211 if (delegate_) 212 delegate_->OnError(); 213 } 214 215 void P2PSocketClientImpl::OnDataReceived(const net::IPEndPoint& address, 216 const std::vector<char>& data, 217 const base::TimeTicks& timestamp) { 218 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 219 DCHECK_EQ(STATE_OPEN, state_); 220 delegate_message_loop_->PostTask( 221 FROM_HERE, 222 base::Bind(&P2PSocketClientImpl::DeliverOnDataReceived, 223 this, 224 address, 225 data, 226 timestamp)); 227 } 228 229 void P2PSocketClientImpl::DeliverOnDataReceived( 230 const net::IPEndPoint& address, const std::vector<char>& data, 231 const base::TimeTicks& timestamp) { 232 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 233 if (delegate_) 234 delegate_->OnDataReceived(address, data, timestamp); 235 } 236 237 void P2PSocketClientImpl::Detach() { 238 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 239 dispatcher_ = NULL; 240 OnError(); 241 } 242 243 } // namespace content 244