Home | History | Annotate | Download | only in device
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/memory/weak_ptr.h"
      6 #include "base/message_loop/message_loop.h"
      7 #include "base/rand_util.h"
      8 #include "chrome/browser/devtools/device/android_device_manager.h"
      9 #include "content/public/browser/browser_thread.h"
     10 #include "net/base/io_buffer.h"
     11 #include "net/base/net_errors.h"
     12 #include "net/server/web_socket.h"
     13 #include "net/socket/stream_socket.h"
     14 
     15 using content::BrowserThread;
     16 using net::WebSocket;
     17 
     18 namespace {
     19 
     20 const int kBufferSize = 16 * 1024;
     21 
     22 class WebSocketImpl {
     23  public:
     24   typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate;
     25 
     26   WebSocketImpl(Delegate* delegate,
     27                 scoped_ptr<net::StreamSocket> socket);
     28   void StartListening();
     29   void SendFrame(const std::string& message);
     30 
     31  private:
     32   void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
     33   void SendPendingRequests(int result);
     34   void Disconnect();
     35 
     36   Delegate* delegate_;
     37   scoped_ptr<net::StreamSocket> socket_;
     38   std::string response_buffer_;
     39   std::string request_buffer_;
     40   base::ThreadChecker thread_checker_;
     41   DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
     42 };
     43 
     44 class DelegateWrapper
     45     : public AndroidDeviceManager::AndroidWebSocket::Delegate {
     46  public:
     47   DelegateWrapper(base::WeakPtr<Delegate> weak_delegate,
     48                   scoped_refptr<base::MessageLoopProxy> message_loop)
     49       : weak_delegate_(weak_delegate),
     50         message_loop_(message_loop) {
     51   }
     52 
     53   virtual ~DelegateWrapper() {}
     54 
     55   // AndroidWebSocket::Delegate implementation
     56   virtual void OnSocketOpened() OVERRIDE {
     57     message_loop_->PostTask(FROM_HERE,
     58         base::Bind(&Delegate::OnSocketOpened, weak_delegate_));
     59   }
     60 
     61   virtual void OnFrameRead(const std::string& message) OVERRIDE {
     62     message_loop_->PostTask(FROM_HERE,
     63         base::Bind(&Delegate::OnFrameRead, weak_delegate_, message));
     64   }
     65 
     66   virtual void OnSocketClosed() OVERRIDE {
     67     message_loop_->PostTask(FROM_HERE,
     68         base::Bind(&Delegate::OnSocketClosed, weak_delegate_));
     69   }
     70 
     71  private:
     72   base::WeakPtr<Delegate> weak_delegate_;
     73   scoped_refptr<base::MessageLoopProxy> message_loop_;
     74 };
     75 
     76 class AndroidWebSocketImpl
     77     : public AndroidDeviceManager::AndroidWebSocket,
     78       public AndroidDeviceManager::AndroidWebSocket::Delegate {
     79  public:
     80   typedef AndroidDeviceManager::Device Device;
     81   AndroidWebSocketImpl(
     82       scoped_refptr<base::MessageLoopProxy> device_message_loop,
     83       scoped_refptr<Device> device,
     84       const std::string& socket_name,
     85       const std::string& url,
     86       AndroidWebSocket::Delegate* delegate);
     87 
     88   virtual ~AndroidWebSocketImpl();
     89 
     90   // AndroidWebSocket implementation
     91   virtual void SendFrame(const std::string& message) OVERRIDE;
     92 
     93   // AndroidWebSocket::Delegate implementation
     94   virtual void OnSocketOpened() OVERRIDE;
     95   virtual void OnFrameRead(const std::string& message) OVERRIDE;
     96   virtual void OnSocketClosed() OVERRIDE;
     97 
     98  private:
     99   void Connected(int result, scoped_ptr<net::StreamSocket> socket);
    100 
    101   scoped_refptr<base::MessageLoopProxy> device_message_loop_;
    102   scoped_refptr<Device> device_;
    103   std::string socket_name_;
    104   std::string url_;
    105   WebSocketImpl* connection_;
    106   DelegateWrapper* delegate_wrapper_;
    107   AndroidWebSocket::Delegate* delegate_;
    108   base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_;
    109   DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl);
    110 };
    111 
    112 AndroidWebSocketImpl::AndroidWebSocketImpl(
    113     scoped_refptr<base::MessageLoopProxy> device_message_loop,
    114     scoped_refptr<Device> device,
    115     const std::string& socket_name,
    116     const std::string& url,
    117     AndroidWebSocket::Delegate* delegate)
    118     : device_message_loop_(device_message_loop),
    119       device_(device),
    120       socket_name_(socket_name),
    121       url_(url),
    122       delegate_(delegate),
    123       weak_factory_(this) {
    124   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    125   DCHECK(delegate_);
    126   device_->HttpUpgrade(
    127       socket_name_, url_,
    128       base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr()));
    129 }
    130 
    131 void AndroidWebSocketImpl::SendFrame(const std::string& message) {
    132   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    133   device_message_loop_->PostTask(
    134       FROM_HERE,
    135       base::Bind(&WebSocketImpl::SendFrame,
    136                  base::Unretained(connection_), message));
    137 }
    138 
    139 void WebSocketImpl::SendFrame(const std::string& message) {
    140   DCHECK(thread_checker_.CalledOnValidThread());
    141   if (!socket_)
    142     return;
    143   int mask = base::RandInt(0, 0x7FFFFFFF);
    144   std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
    145   request_buffer_ += encoded_frame;
    146   if (request_buffer_.length() == encoded_frame.length())
    147     SendPendingRequests(0);
    148 }
    149 
    150 AndroidWebSocketImpl::~AndroidWebSocketImpl() {
    151   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    152   device_message_loop_->DeleteSoon(FROM_HERE, connection_);
    153   device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_);
    154 }
    155 
    156 WebSocketImpl::WebSocketImpl(Delegate* delegate,
    157                              scoped_ptr<net::StreamSocket> socket)
    158                              : delegate_(delegate),
    159                                socket_(socket.Pass()) {
    160   thread_checker_.DetachFromThread();
    161 }
    162 
    163 void AndroidWebSocketImpl::Connected(int result,
    164                                      scoped_ptr<net::StreamSocket> socket) {
    165   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    166   if (result != net::OK || socket == NULL) {
    167     OnSocketClosed();
    168     return;
    169   }
    170   delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(),
    171                                           base::MessageLoopProxy::current());
    172   connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass());
    173   device_message_loop_->PostTask(
    174       FROM_HERE,
    175       base::Bind(&WebSocketImpl::StartListening,
    176                  base::Unretained(connection_)));
    177   OnSocketOpened();
    178 }
    179 
    180 void WebSocketImpl::StartListening() {
    181   DCHECK(thread_checker_.CalledOnValidThread());
    182   DCHECK(socket_);
    183   scoped_refptr<net::IOBuffer> response_buffer =
    184       new net::IOBuffer(kBufferSize);
    185   int result = socket_->Read(
    186       response_buffer.get(),
    187       kBufferSize,
    188       base::Bind(&WebSocketImpl::OnBytesRead,
    189                  base::Unretained(this), response_buffer));
    190   if (result != net::ERR_IO_PENDING)
    191     OnBytesRead(response_buffer, result);
    192 }
    193 
    194 void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer,
    195                                 int result) {
    196   DCHECK(thread_checker_.CalledOnValidThread());
    197   if (result <= 0) {
    198     Disconnect();
    199     return;
    200   }
    201 
    202   response_buffer_.append(response_buffer->data(), result);
    203 
    204   int bytes_consumed;
    205   std::string output;
    206   WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
    207       response_buffer_, false, &bytes_consumed, &output);
    208 
    209   while (parse_result == WebSocket::FRAME_OK) {
    210     response_buffer_ = response_buffer_.substr(bytes_consumed);
    211     delegate_->OnFrameRead(output);
    212     parse_result = WebSocket::DecodeFrameHybi17(
    213         response_buffer_, false, &bytes_consumed, &output);
    214   }
    215 
    216   if (parse_result == WebSocket::FRAME_ERROR ||
    217       parse_result == WebSocket::FRAME_CLOSE) {
    218     Disconnect();
    219     return;
    220   }
    221 
    222   result = socket_->Read(
    223       response_buffer.get(),
    224       kBufferSize,
    225       base::Bind(&WebSocketImpl::OnBytesRead,
    226                  base::Unretained(this), response_buffer));
    227   if (result != net::ERR_IO_PENDING)
    228     OnBytesRead(response_buffer, result);
    229 }
    230 
    231 void WebSocketImpl::SendPendingRequests(int result) {
    232   DCHECK(thread_checker_.CalledOnValidThread());
    233   if (result < 0) {
    234     Disconnect();
    235     return;
    236   }
    237   request_buffer_ = request_buffer_.substr(result);
    238   if (request_buffer_.empty())
    239     return;
    240 
    241   scoped_refptr<net::StringIOBuffer> buffer =
    242       new net::StringIOBuffer(request_buffer_);
    243   result = socket_->Write(buffer.get(), buffer->size(),
    244                           base::Bind(&WebSocketImpl::SendPendingRequests,
    245                                      base::Unretained(this)));
    246   if (result != net::ERR_IO_PENDING)
    247     SendPendingRequests(result);
    248 }
    249 
    250 void WebSocketImpl::Disconnect() {
    251   DCHECK(thread_checker_.CalledOnValidThread());
    252   socket_.reset();
    253   delegate_->OnSocketClosed();
    254 }
    255 
    256 void AndroidWebSocketImpl::OnSocketOpened() {
    257   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    258   delegate_->OnSocketOpened();
    259 }
    260 
    261 void AndroidWebSocketImpl::OnFrameRead(const std::string& message) {
    262   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    263   delegate_->OnFrameRead(message);
    264 }
    265 
    266 void AndroidWebSocketImpl::OnSocketClosed() {
    267   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    268   delegate_->OnSocketClosed();
    269 }
    270 
    271 }  // namespace
    272 
    273 AndroidDeviceManager::AndroidWebSocket*
    274 AndroidDeviceManager::Device::CreateWebSocket(
    275     const std::string& socket,
    276     const std::string& url,
    277     AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) {
    278   return new AndroidWebSocketImpl(
    279       device_message_loop_, this, socket, url, delegate);
    280 }
    281