Home | History | Annotate | Download | only in device
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_loop/message_loop.h"
      6 #include "base/rand_util.h"
      7 #include "chrome/browser/devtools/device/android_device_manager.h"
      8 #include "content/public/browser/browser_thread.h"
      9 #include "net/base/io_buffer.h"
     10 #include "net/base/net_errors.h"
     11 #include "net/server/web_socket.h"
     12 #include "net/socket/stream_socket.h"
     13 
     14 using content::BrowserThread;
     15 using net::WebSocket;
     16 
     17 namespace {
     18 
     19 const int kBufferSize = 16 * 1024;
     20 
     21 class WebSocketImpl : public AndroidDeviceManager::AndroidWebSocket {
     22  public:
     23   typedef AndroidDeviceManager::Device Device;
     24   WebSocketImpl(scoped_refptr<base::MessageLoopProxy> device_message_loop,
     25                 scoped_refptr<Device> device,
     26                 const std::string& socket_name,
     27                 const std::string& url,
     28                 Delegate* delegate);
     29 
     30   virtual void Connect() OVERRIDE;
     31   virtual void Disconnect() OVERRIDE;
     32   virtual void SendFrame(const std::string& message) OVERRIDE;
     33   virtual void ClearDelegate() OVERRIDE;
     34 
     35  private:
     36   friend class base::RefCountedThreadSafe<AndroidWebSocket>;
     37 
     38   virtual ~WebSocketImpl();
     39 
     40   void Connected(int result, net::StreamSocket* socket);
     41   void StartListeningOnHandlerThread();
     42   void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
     43   void SendFrameOnHandlerThread(const std::string& message);
     44   void SendPendingRequests(int result);
     45   void DisconnectOnHandlerThread(bool closed_by_device);
     46 
     47   void OnSocketOpened();
     48   void OnFrameRead(const std::string& message);
     49   void OnSocketClosed(bool closed_by_device);
     50 
     51   scoped_refptr<base::MessageLoopProxy> device_message_loop_;
     52   scoped_refptr<Device> device_;
     53   std::string socket_name_;
     54   std::string url_;
     55   scoped_ptr<net::StreamSocket> socket_;
     56   Delegate* delegate_;
     57   std::string response_buffer_;
     58   std::string request_buffer_;
     59 };
     60 
     61 WebSocketImpl::WebSocketImpl(
     62     scoped_refptr<base::MessageLoopProxy> device_message_loop,
     63     scoped_refptr<Device> device,
     64     const std::string& socket_name,
     65     const std::string& url,
     66     Delegate* delegate)
     67     : device_message_loop_(device_message_loop),
     68       device_(device),
     69       socket_name_(socket_name),
     70       url_(url),
     71       delegate_(delegate) {
     72 }
     73 
     74 void WebSocketImpl::Connect() {
     75   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
     76   device_->HttpUpgrade(
     77       socket_name_, url_, base::Bind(&WebSocketImpl::Connected, this));
     78 }
     79 
     80 void WebSocketImpl::Disconnect() {
     81   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
     82   device_message_loop_->PostTask(
     83       FROM_HERE,
     84       base::Bind(&WebSocketImpl::DisconnectOnHandlerThread, this, false));
     85 }
     86 
     87 void WebSocketImpl::SendFrame(const std::string& message) {
     88   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
     89   device_message_loop_->PostTask(
     90       FROM_HERE,
     91       base::Bind(&WebSocketImpl::SendFrameOnHandlerThread, this, message));
     92 }
     93 
     94 void WebSocketImpl::ClearDelegate() {
     95   delegate_ = NULL;
     96 }
     97 
     98 void WebSocketImpl::SendFrameOnHandlerThread(const std::string& message) {
     99   DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
    100   int mask = base::RandInt(0, 0x7FFFFFFF);
    101   std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
    102   request_buffer_ += encoded_frame;
    103   if (request_buffer_.length() == encoded_frame.length())
    104     SendPendingRequests(0);
    105 }
    106 
    107 WebSocketImpl::~WebSocketImpl() {
    108   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    109 }
    110 
    111 void WebSocketImpl::Connected(int result, net::StreamSocket* socket) {
    112   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    113   if (result != net::OK || socket == NULL) {
    114     OnSocketClosed(true);
    115     return;
    116   }
    117   socket_.reset(socket);
    118   device_message_loop_->PostTask(
    119       FROM_HERE,
    120       base::Bind(&WebSocketImpl::StartListeningOnHandlerThread, this));
    121   OnSocketOpened();
    122 }
    123 
    124 void WebSocketImpl::StartListeningOnHandlerThread() {
    125   DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
    126   scoped_refptr<net::IOBuffer> response_buffer =
    127       new net::IOBuffer(kBufferSize);
    128   int result = socket_->Read(
    129       response_buffer.get(),
    130       kBufferSize,
    131       base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer));
    132   if (result != net::ERR_IO_PENDING)
    133     OnBytesRead(response_buffer, result);
    134 }
    135 
    136 void WebSocketImpl::OnBytesRead(
    137     scoped_refptr<net::IOBuffer> response_buffer, int result) {
    138   DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
    139   if (!socket_)
    140     return;
    141 
    142   if (result <= 0) {
    143     DisconnectOnHandlerThread(true);
    144     return;
    145   }
    146 
    147   std::string data = std::string(response_buffer->data(), result);
    148   response_buffer_ += data;
    149 
    150   int bytes_consumed;
    151   std::string output;
    152   WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
    153       response_buffer_, false, &bytes_consumed, &output);
    154 
    155   while (parse_result == WebSocket::FRAME_OK) {
    156     response_buffer_ = response_buffer_.substr(bytes_consumed);
    157     BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
    158         base::Bind(&WebSocketImpl::OnFrameRead, this, output));
    159     parse_result = WebSocket::DecodeFrameHybi17(
    160         response_buffer_, false, &bytes_consumed, &output);
    161   }
    162 
    163   if (parse_result == WebSocket::FRAME_ERROR ||
    164       parse_result == WebSocket::FRAME_CLOSE) {
    165     DisconnectOnHandlerThread(true);
    166     return;
    167   }
    168 
    169   result = socket_->Read(
    170       response_buffer.get(),
    171       kBufferSize,
    172       base::Bind(&WebSocketImpl::OnBytesRead, this, response_buffer));
    173   if (result != net::ERR_IO_PENDING)
    174     OnBytesRead(response_buffer, result);
    175 }
    176 
    177 void WebSocketImpl::SendPendingRequests(int result) {
    178   DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
    179   if (!socket_)
    180     return;
    181   if (result < 0) {
    182     DisconnectOnHandlerThread(true);
    183     return;
    184   }
    185   request_buffer_ = request_buffer_.substr(result);
    186   if (request_buffer_.empty())
    187     return;
    188 
    189   scoped_refptr<net::StringIOBuffer> buffer =
    190       new net::StringIOBuffer(request_buffer_);
    191   result = socket_->Write(buffer.get(), buffer->size(),
    192                           base::Bind(&WebSocketImpl::SendPendingRequests,
    193                                      this));
    194   if (result != net::ERR_IO_PENDING)
    195     SendPendingRequests(result);
    196 }
    197 
    198 void WebSocketImpl::DisconnectOnHandlerThread(bool closed_by_device) {
    199   DCHECK_EQ(device_message_loop_, base::MessageLoopProxy::current());
    200   if (!socket_)
    201     return;
    202   // Wipe out socket_ first since Disconnect can re-enter this method.
    203   scoped_ptr<net::StreamSocket> socket(socket_.release());
    204   socket->Disconnect();
    205   BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
    206       base::Bind(&WebSocketImpl::OnSocketClosed, this, closed_by_device));
    207 }
    208 
    209 void WebSocketImpl::OnSocketOpened() {
    210   if (delegate_)
    211     delegate_->OnSocketOpened();
    212 }
    213 
    214 void WebSocketImpl::OnFrameRead(const std::string& message) {
    215   if (delegate_)
    216     delegate_->OnFrameRead(message);
    217 }
    218 
    219 void WebSocketImpl::OnSocketClosed(bool closed_by_device) {
    220   if (delegate_)
    221     delegate_->OnSocketClosed(closed_by_device);
    222 }
    223 
    224 }  // namespace
    225 
    226 scoped_refptr<AndroidDeviceManager::AndroidWebSocket>
    227 AndroidDeviceManager::Device::CreateWebSocket(
    228     const std::string& socket,
    229     const std::string& url,
    230     AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) {
    231   return new WebSocketImpl(device_message_loop_, this, socket, url, delegate);
    232 }
    233