1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "content/renderer/p2p/socket_client_impl.h" 6 7 #include "base/bind.h" 8 #include "base/message_loop/message_loop_proxy.h" 9 #include "base/time/time.h" 10 #include "content/common/p2p_messages.h" 11 #include "content/renderer/p2p/socket_client_delegate.h" 12 #include "content/renderer/p2p/socket_dispatcher.h" 13 #include "content/renderer/render_thread_impl.h" 14 #include "crypto/random.h" 15 16 namespace { 17 18 uint64 GetUniqueId(uint32 random_socket_id, uint32 packet_id) { 19 uint64 uid = random_socket_id; 20 uid <<= 32; 21 uid |= packet_id; 22 return uid; 23 } 24 25 } // namespace 26 27 namespace content { 28 29 P2PSocketClientImpl::P2PSocketClientImpl(P2PSocketDispatcher* dispatcher) 30 : dispatcher_(dispatcher), 31 ipc_message_loop_(dispatcher->message_loop()), 32 delegate_message_loop_(base::MessageLoopProxy::current()), 33 socket_id_(0), delegate_(NULL), 34 state_(STATE_UNINITIALIZED), 35 random_socket_id_(0), 36 next_packet_id_(0) { 37 crypto::RandBytes(&random_socket_id_, sizeof(random_socket_id_)); 38 } 39 40 P2PSocketClientImpl::~P2PSocketClientImpl() { 41 CHECK(state_ == STATE_CLOSED || state_ == STATE_UNINITIALIZED); 42 } 43 44 void P2PSocketClientImpl::Init( 45 P2PSocketType type, 46 const net::IPEndPoint& local_address, 47 const P2PHostAndIPEndPoint& remote_address, 48 P2PSocketClientDelegate* delegate) { 49 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 50 DCHECK(delegate); 51 // |delegate_| is only accessesed on |delegate_message_loop_|. 52 delegate_ = delegate; 53 54 ipc_message_loop_->PostTask( 55 FROM_HERE, base::Bind(&P2PSocketClientImpl::DoInit, 56 this, 57 type, 58 local_address, 59 remote_address)); 60 } 61 62 void P2PSocketClientImpl::DoInit(P2PSocketType type, 63 const net::IPEndPoint& local_address, 64 const P2PHostAndIPEndPoint& remote_address) { 65 DCHECK_EQ(state_, STATE_UNINITIALIZED); 66 state_ = STATE_OPENING; 67 socket_id_ = dispatcher_->RegisterClient(this); 68 dispatcher_->SendP2PMessage(new P2PHostMsg_CreateSocket( 69 type, socket_id_, local_address, remote_address)); 70 } 71 72 void P2PSocketClientImpl::SendWithDscp( 73 const net::IPEndPoint& address, 74 const std::vector<char>& data, 75 const talk_base::PacketOptions& options) { 76 if (!ipc_message_loop_->BelongsToCurrentThread()) { 77 ipc_message_loop_->PostTask( 78 FROM_HERE, base::Bind( 79 &P2PSocketClientImpl::SendWithDscp, this, address, data, options)); 80 return; 81 } 82 83 // Can send data only when the socket is open. 84 DCHECK(state_ == STATE_OPEN || state_ == STATE_ERROR); 85 if (state_ == STATE_OPEN) { 86 uint64 unique_id = GetUniqueId(random_socket_id_, ++next_packet_id_); 87 TRACE_EVENT_ASYNC_BEGIN0("p2p", "Send", unique_id); 88 dispatcher_->SendP2PMessage(new P2PHostMsg_Send(socket_id_, address, data, 89 options, unique_id)); 90 } 91 } 92 93 void P2PSocketClientImpl::Send(const net::IPEndPoint& address, 94 const std::vector<char>& data) { 95 talk_base::PacketOptions options(talk_base::DSCP_DEFAULT); 96 SendWithDscp(address, data, options); 97 } 98 99 void P2PSocketClientImpl::SetOption(P2PSocketOption option, 100 int value) { 101 if (!ipc_message_loop_->BelongsToCurrentThread()) { 102 ipc_message_loop_->PostTask( 103 FROM_HERE, base::Bind( 104 &P2PSocketClientImpl::SetOption, this, option, value)); 105 return; 106 } 107 108 DCHECK(state_ == STATE_OPEN || state_ == STATE_ERROR); 109 if (state_ == STATE_OPEN) { 110 dispatcher_->SendP2PMessage(new P2PHostMsg_SetOption(socket_id_, 111 option, value)); 112 } 113 } 114 115 void P2PSocketClientImpl::Close() { 116 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 117 118 delegate_ = NULL; 119 120 ipc_message_loop_->PostTask( 121 FROM_HERE, base::Bind(&P2PSocketClientImpl::DoClose, this)); 122 } 123 124 void P2PSocketClientImpl::DoClose() { 125 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 126 if (dispatcher_) { 127 if (state_ == STATE_OPEN || state_ == STATE_OPENING || 128 state_ == STATE_ERROR) { 129 dispatcher_->SendP2PMessage(new P2PHostMsg_DestroySocket(socket_id_)); 130 } 131 dispatcher_->UnregisterClient(socket_id_); 132 } 133 134 state_ = STATE_CLOSED; 135 } 136 137 int P2PSocketClientImpl::GetSocketID() const { 138 return socket_id_; 139 } 140 141 void P2PSocketClientImpl::SetDelegate(P2PSocketClientDelegate* delegate) { 142 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 143 delegate_ = delegate; 144 } 145 146 void P2PSocketClientImpl::OnSocketCreated(const net::IPEndPoint& address) { 147 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 148 DCHECK_EQ(state_, STATE_OPENING); 149 state_ = STATE_OPEN; 150 151 delegate_message_loop_->PostTask( 152 FROM_HERE, 153 base::Bind(&P2PSocketClientImpl::DeliverOnSocketCreated, this, address)); 154 } 155 156 void P2PSocketClientImpl::DeliverOnSocketCreated( 157 const net::IPEndPoint& address) { 158 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 159 if (delegate_) 160 delegate_->OnOpen(address); 161 } 162 163 void P2PSocketClientImpl::OnIncomingTcpConnection( 164 const net::IPEndPoint& address) { 165 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 166 DCHECK_EQ(state_, STATE_OPEN); 167 168 scoped_refptr<P2PSocketClientImpl> new_client = 169 new P2PSocketClientImpl(dispatcher_); 170 new_client->socket_id_ = dispatcher_->RegisterClient(new_client.get()); 171 new_client->state_ = STATE_OPEN; 172 new_client->delegate_message_loop_ = delegate_message_loop_; 173 174 dispatcher_->SendP2PMessage(new P2PHostMsg_AcceptIncomingTcpConnection( 175 socket_id_, address, new_client->socket_id_)); 176 177 delegate_message_loop_->PostTask( 178 FROM_HERE, base::Bind( 179 &P2PSocketClientImpl::DeliverOnIncomingTcpConnection, 180 this, address, new_client)); 181 } 182 183 void P2PSocketClientImpl::DeliverOnIncomingTcpConnection( 184 const net::IPEndPoint& address, 185 scoped_refptr<P2PSocketClient> new_client) { 186 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 187 if (delegate_) { 188 delegate_->OnIncomingTcpConnection(address, new_client.get()); 189 } else { 190 // Just close the socket if there is no delegate to accept it. 191 new_client->Close(); 192 } 193 } 194 195 void P2PSocketClientImpl::OnSendComplete() { 196 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 197 198 delegate_message_loop_->PostTask( 199 FROM_HERE, base::Bind(&P2PSocketClientImpl::DeliverOnSendComplete, this)); 200 } 201 202 void P2PSocketClientImpl::DeliverOnSendComplete() { 203 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 204 if (delegate_) 205 delegate_->OnSendComplete(); 206 } 207 208 void P2PSocketClientImpl::OnError() { 209 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 210 state_ = STATE_ERROR; 211 212 delegate_message_loop_->PostTask( 213 FROM_HERE, base::Bind(&P2PSocketClientImpl::DeliverOnError, this)); 214 } 215 216 void P2PSocketClientImpl::DeliverOnError() { 217 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 218 if (delegate_) 219 delegate_->OnError(); 220 } 221 222 void P2PSocketClientImpl::OnDataReceived(const net::IPEndPoint& address, 223 const std::vector<char>& data, 224 const base::TimeTicks& timestamp) { 225 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 226 DCHECK_EQ(STATE_OPEN, state_); 227 delegate_message_loop_->PostTask( 228 FROM_HERE, 229 base::Bind(&P2PSocketClientImpl::DeliverOnDataReceived, 230 this, 231 address, 232 data, 233 timestamp)); 234 } 235 236 void P2PSocketClientImpl::DeliverOnDataReceived( 237 const net::IPEndPoint& address, const std::vector<char>& data, 238 const base::TimeTicks& timestamp) { 239 DCHECK(delegate_message_loop_->BelongsToCurrentThread()); 240 if (delegate_) 241 delegate_->OnDataReceived(address, data, timestamp); 242 } 243 244 void P2PSocketClientImpl::Detach() { 245 DCHECK(ipc_message_loop_->BelongsToCurrentThread()); 246 dispatcher_ = NULL; 247 OnError(); 248 } 249 250 } // namespace content 251