1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/message_loop/message_loop.h" 6 #include "base/rand_util.h" 7 #include "chrome/browser/devtools/device/android_device_manager.h" 8 #include "content/public/browser/browser_thread.h" 9 #include "net/base/io_buffer.h" 10 #include "net/base/net_errors.h" 11 #include "net/server/web_socket.h" 12 #include "net/socket/stream_socket.h" 13 14 using content::BrowserThread; 15 using net::WebSocket; 16 17 namespace { 18 19 const int kBufferSize = 16 * 1024; 20 21 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket { 22 public: 23 typedef AndroidDeviceManager::Device Device; 24 WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop, 25 scoped_refptr<Device> device, 26 const std::string& socket_name, 27 const std::string& url, 28 Delegate* delegate); 29 30 virtual void Connect() OVERRIDE; 31 virtual void Disconnect() OVERRIDE; 32 virtual void SendFrame(const std::string& message) OVERRIDE; 33 virtual void ClearDelegate() OVERRIDE; 34 35 private: 36 friend class base::RefCountedThreadSafe<AndroidWebSocket>; 37 38 virtual ~WebSocketImpl(); 39 40 void Connected(int result, net::StreamSocket* socket); 41 void StartListeningOnHandlerThread(); 42 void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result); 43 void SendFrameOnHandlerThread(const std::string& message); 44 void SendPendingRequests(int result); 45 void DisconnectOnHandlerThread(bool closed_by_device); 46 47 void OnSocketOpened(); 48 void OnFrameRead(const std::string& message); 49 void OnSocketClosed(bool closed_by_device); 50 51 scoped_refptr<base::MessageLoopProxy> device_message_loop_; 52 scoped_refptr<Device> device_; 53 std::string socket_name_; 54 std::string url_; 55 scoped_ptr<net::StreamSocket> socket_; 56 Delegate* delegate_; 57 std::string response_buffer_; 58 std::string request_buffer_; 59 }; 60 61 WebSocketImpl::WebSocketImpl( 62 scoped_refptr<base::MessageLoopProxy> device_message_loop, 63 scoped_refptr<Device> device, 64 const std::string& socket_name, 65 const std::string& url, 66 Delegate* delegate) 67 : device_message_loop_(device_message_loop), 68 device_(device), 69 socket_name_(socket_name), 70 url_(url), 71 delegate_(delegate) { 72 } 73 74 void WebSocketImpl::Connect() { 75 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 76 device_->HttpUpgrade( 77 socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this)); 78 } 79 80 void WebSocketImpl::Disconnect() { 81 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 82 device_message_loop_->PostTask( 83 FROM_HERE, 84 base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false)); 85 } 86 87 void WebSocketImpl::SendFrame(const std::string& message) { 88 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 89 device_message_loop_->PostTask( 90 FROM_HERE, 91 base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message)); 92 } 93 94 void WebSocketImpl::ClearDelegate() { 95 delegate_ = NULL; 96 } 97 98 void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) { 99 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); 100 int mask = base::RandInt(0, 0x7FFFFFFF); 101 std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask); 102 request_buffer_ += encoded_frame; 103 if (request_buffer_.length() == encoded_frame.length()) 104 SendPendingRequests(0); 105 } 106 107 WebSocketImpl::~WebSocketImpl() { 108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 109 } 110 111 void WebSocketImpl::Connected(int result, net::StreamSocket* socket) { 112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 113 if (result != net::OK || socket == NULL) { 114 OnSocketClosed(true); 115 return; 116 } 117 socket_.reset(socket); 118 device_message_loop_->PostTask( 119 FROM_HERE, 120 base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this)); 121 OnSocketOpened(); 122 } 123 124 void WebSocketImpl::StartListeningOnHandlerThread() { 125 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); 126 scoped_refptr<net::IOBuffer> response_buffer = 127 new net::IOBuffer(kBufferSize); 128 int result = socket_->Read( 129 response_buffer.get(), 130 kBufferSize, 131 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); 132 if (result != net::ERR_IO_PENDING) 133 OnBytesRead(response_buffer, result); 134 } 135 136 void WebSocketImpl::OnBytesRead( 137 scoped_refptr<net::IOBuffer> response_buffer, int result) { 138 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); 139 if (!socket_) 140 return; 141 142 if (result <= 0) { 143 DisconnectOnHandlerThread(true); 144 return; 145 } 146 147 std::string data = std::string(response_buffer->data(), result); 148 response_buffer_ += data; 149 150 int bytes_consumed; 151 std::string output; 152 WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17( 153 response_buffer_, false, &bytes_consumed, &output); 154 155 while (parse_result == WebSocket::FRAME_OK) { 156 response_buffer_ = response_buffer_.substr(bytes_consumed); 157 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, 158 base::Bind(&WebSocketImpl::OnFrameRead, this, output)); 159 parse_result = WebSocket::DecodeFrameHybi17( 160 response_buffer_, false, &bytes_consumed, &output); 161 } 162 163 if (parse_result == WebSocket::FRAME_ERROR || 164 parse_result == WebSocket::FRAME_CLOSE) { 165 DisconnectOnHandlerThread(true); 166 return; 167 } 168 169 result = socket_->Read( 170 response_buffer.get(), 171 kBufferSize, 172 base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer)); 173 if (result != net::ERR_IO_PENDING) 174 OnBytesRead(response_buffer, result); 175 } 176 177 void WebSocketImpl::SendPendingRequests(int result) { 178 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); 179 if (!socket_) 180 return; 181 if (result < 0) { 182 DisconnectOnHandlerThread(true); 183 return; 184 } 185 request_buffer_ = request_buffer_.substr(result); 186 if (request_buffer_.empty()) 187 return; 188 189 scoped_refptr<net::StringIOBuffer> buffer = 190 new net::StringIOBuffer(request_buffer_); 191 result = socket_->Write(buffer.get(), buffer->size(), 192 base::Bind(&WebSocketImpl::SendPendingRequests, 193 this)); 194 if (result != net::ERR_IO_PENDING) 195 SendPendingRequests(result); 196 } 197 198 void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) { 199 DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current()); 200 if (!socket_) 201 return; 202 // Wipe out socket_ first since Disconnect can re-enter this method. 203 scoped_ptr<net::StreamSocket> socket(socket_.release()); 204 socket->Disconnect(); 205 BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, 206 base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device)); 207 } 208 209 void WebSocketImpl::OnSocketOpened() { 210 if (delegate_) 211 delegate_->OnSocketOpened(); 212 } 213 214 void WebSocketImpl::OnFrameRead(const std::string& message) { 215 if (delegate_) 216 delegate_->OnFrameRead(message); 217 } 218 219 void WebSocketImpl::OnSocketClosed(bool closed_by_device) { 220 if (delegate_) 221 delegate_->OnSocketClosed(closed_by_device); 222 } 223 224 } // namespace 225 226 scoped_refptr<AndroidDeviceManager::AndroidWebSocket> 227 AndroidDeviceManager::Device::CreateWebSocket( 228 const std::string& socket, 229 const std::string& url, 230 AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { 231 return new WebSocketImpl(device_message_loop_, this, socket, url, delegate); 232 } 233