1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/memory/weak_ptr.h" 6 #include "base/message_loop/message_loop.h" 7 #include "base/rand_util.h" 8 #include "chrome/browser/devtools/device/android_device_manager.h" 9 #include "content/public/browser/browser_thread.h" 10 #include "net/base/io_buffer.h" 11 #include "net/base/net_errors.h" 12 #include "net/server/web_socket.h" 13 #include "net/socket/stream_socket.h" 14 15 using content::BrowserThread; 16 using net::WebSocket; 17 18 namespace { 19 20 const int kBufferSize = 16 * 1024; 21 22 class WebSocketImpl { 23 public: 24 typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate; 25 26 WebSocketImpl(Delegate* delegate, 27 scoped_ptr<net::StreamSocket> socket); 28 void StartListening(); 29 void SendFrame(const std::string& message); 30 31 private: 32 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); 33 void SendPendingRequests(int result); 34 void Disconnect(); 35 36 Delegate* delegate_; 37 scoped_ptr<net::StreamSocket> socket_; 38 std::string response_buffer_; 39 std::string request_buffer_; 40 base::ThreadChecker thread_checker_; 41 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); 42 }; 43 44 class DelegateWrapper 45 : public AndroidDeviceManager::AndroidWebSocket::Delegate { 46 public: 47 DelegateWrapper(base::WeakPtr<Delegate> weak_delegate, 48 scoped_refptr<base::MessageLoopProxy> message_loop) 49 : weak_delegate_(weak_delegate), 50 message_loop_(message_loop) { 51 } 52 53 virtual ~DelegateWrapper() {} 54 55 // AndroidWebSocket::Delegate implementation 56 virtual void OnSocketOpened() OVERRIDE { 57 message_loop_->PostTask(FROM_HERE, 58 base::Bind(&Delegate::OnSocketOpened, weak_delegate_)); 59 } 60 61 virtual void OnFrameRead(const std::string& message) OVERRIDE { 62 message_loop_->PostTask(FROM_HERE, 63 base::Bind(&Delegate::OnFrameRead, weak_delegate_, message)); 64 } 65 66 virtual void OnSocketClosed() OVERRIDE { 67 message_loop_->PostTask(FROM_HERE, 68 base::Bind(&Delegate::OnSocketClosed, weak_delegate_)); 69 } 70 71 private: 72 base::WeakPtr<Delegate> weak_delegate_; 73 scoped_refptr<base::MessageLoopProxy> message_loop_; 74 }; 75 76 class AndroidWebSocketImpl 77 : public AndroidDeviceManager::AndroidWebSocket, 78 public AndroidDeviceManager::AndroidWebSocket::Delegate { 79 public: 80 typedef AndroidDeviceManager::Device Device; 81 AndroidWebSocketImpl( 82 scoped_refptr<base::MessageLoopProxy> device_message_loop, 83 scoped_refptr<Device> device, 84 const std::string& socket_name, 85 const std::string& url, 86 AndroidWebSocket::Delegate* delegate); 87 88 virtual ~AndroidWebSocketImpl(); 89 90 // AndroidWebSocket implementation 91 virtual void SendFrame(const std::string& message) OVERRIDE; 92 93 // AndroidWebSocket::Delegate implementation 94 virtual void OnSocketOpened() OVERRIDE; 95 virtual void OnFrameRead(const std::string& message) OVERRIDE; 96 virtual void OnSocketClosed() OVERRIDE; 97 98 private: 99 void Connected(int result, scoped_ptr<net::StreamSocket> socket); 100 101 scoped_refptr<base::MessageLoopProxy> device_message_loop_; 102 scoped_refptr<Device> device_; 103 std::string socket_name_; 104 std::string url_; 105 WebSocketImpl* connection_; 106 DelegateWrapper* delegate_wrapper_; 107 AndroidWebSocket::Delegate* delegate_; 108 base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_; 109 DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl); 110 }; 111 112 AndroidWebSocketImpl::AndroidWebSocketImpl( 113 scoped_refptr<base::MessageLoopProxy> device_message_loop, 114 scoped_refptr<Device> device, 115 const std::string& socket_name, 116 const std::string& url, 117 AndroidWebSocket::Delegate* delegate) 118 : device_message_loop_(device_message_loop), 119 device_(device), 120 socket_name_(socket_name), 121 url_(url), 122 delegate_(delegate), 123 weak_factory_(this) { 124 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 125 DCHECK(delegate_); 126 device_->HttpUpgrade( 127 socket_name_, url_, 128 base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr())); 129 } 130 131 void AndroidWebSocketImpl::SendFrame(const std::string& message) { 132 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 133 device_message_loop_->PostTask( 134 FROM_HERE, 135 base::Bind(&WebSocketImpl::SendFrame, 136 base::Unretained(connection_), message)); 137 } 138 139 void WebSocketImpl::SendFrame(const std::string& message) { 140 DCHECK(thread_checker_.CalledOnValidThread()); 141 if (!socket_) 142 return; 143 int mask = base::RandInt(0, 0x7FFFFFFF); 144 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); 145 request_buffer_ += encoded_frame; 146 if (request_buffer_.length() == encoded_frame.length()) 147 SendPendingRequests(0); 148 } 149 150 AndroidWebSocketImpl::~AndroidWebSocketImpl() { 151 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 152 device_message_loop_->DeleteSoon(FROM_HERE, connection_); 153 device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_); 154 } 155 156 WebSocketImpl::WebSocketImpl(Delegate* delegate, 157 scoped_ptr<net::StreamSocket> socket) 158 : delegate_(delegate), 159 socket_(socket.Pass()) { 160 thread_checker_.DetachFromThread(); 161 } 162 163 void AndroidWebSocketImpl::Connected(int result, 164 scoped_ptr<net::StreamSocket> socket) { 165 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 166 if (result != net::OK || socket == NULL) { 167 OnSocketClosed(); 168 return; 169 } 170 delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(), 171 base::MessageLoopProxy::current()); 172 connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass()); 173 device_message_loop_->PostTask( 174 FROM_HERE, 175 base::Bind(&WebSocketImpl::StartListening, 176 base::Unretained(connection_))); 177 OnSocketOpened(); 178 } 179 180 void WebSocketImpl::StartListening() { 181 DCHECK(thread_checker_.CalledOnValidThread()); 182 DCHECK(socket_); 183 scoped_refptr<net::IOBuffer> response_buffer = 184 new net::IOBuffer(kBufferSize); 185 int result = socket_->Read( 186 response_buffer.get(), 187 kBufferSize, 188 base::Bind(&WebSocketImpl::OnBytesRead, 189 base::Unretained(this), response_buffer)); 190 if (result != net::ERR_IO_PENDING) 191 OnBytesRead(response_buffer, result); 192 } 193 194 void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, 195 int result) { 196 DCHECK(thread_checker_.CalledOnValidThread()); 197 if (result <= 0) { 198 Disconnect(); 199 return; 200 } 201 202 response_buffer_.append(response_buffer->data(), result); 203 204 int bytes_consumed; 205 std::string output; 206 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( 207 response_buffer_, false, &bytes_consumed, &output); 208 209 while (parse_result == WebSocket::FRAME_OK) { 210 response_buffer_ = response_buffer_.substr(bytes_consumed); 211 delegate_->OnFrameRead(output); 212 parse_result = WebSocket::DecodeFrameHybi17( 213 response_buffer_, false, &bytes_consumed, &output); 214 } 215 216 if (parse_result == WebSocket::FRAME_ERROR || 217 parse_result == WebSocket::FRAME_CLOSE) { 218 Disconnect(); 219 return; 220 } 221 222 result = socket_->Read( 223 response_buffer.get(), 224 kBufferSize, 225 base::Bind(&WebSocketImpl::OnBytesRead, 226 base::Unretained(this), response_buffer)); 227 if (result != net::ERR_IO_PENDING) 228 OnBytesRead(response_buffer, result); 229 } 230 231 void WebSocketImpl::SendPendingRequests(int result) { 232 DCHECK(thread_checker_.CalledOnValidThread()); 233 if (result < 0) { 234 Disconnect(); 235 return; 236 } 237 request_buffer_ = request_buffer_.substr(result); 238 if (request_buffer_.empty()) 239 return; 240 241 scoped_refptr<net::StringIOBuffer> buffer = 242 new net::StringIOBuffer(request_buffer_); 243 result = socket_->Write(buffer.get(), buffer->size(), 244 base::Bind(&WebSocketImpl::SendPendingRequests, 245 base::Unretained(this))); 246 if (result != net::ERR_IO_PENDING) 247 SendPendingRequests(result); 248 } 249 250 void WebSocketImpl::Disconnect() { 251 DCHECK(thread_checker_.CalledOnValidThread()); 252 socket_.reset(); 253 delegate_->OnSocketClosed(); 254 } 255 256 void AndroidWebSocketImpl::OnSocketOpened() { 257 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 258 delegate_->OnSocketOpened(); 259 } 260 261 void AndroidWebSocketImpl::OnFrameRead(const std::string& message) { 262 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 263 delegate_->OnFrameRead(message); 264 } 265 266 void AndroidWebSocketImpl::OnSocketClosed() { 267 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 268 delegate_->OnSocketClosed(); 269 } 270 271 } // namespace 272 273 AndroidDeviceManager::AndroidWebSocket* 274 AndroidDeviceManager::Device::CreateWebSocket( 275 const std::string& socket, 276 const std::string& url, 277 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { 278 return new AndroidWebSocketImpl( 279 device_message_loop_, this, socket, url, delegate); 280 } 281