Home | History | Annotate | Download | only in forwarder2
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "tools/android/forwarder2/forwarder.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/bind.h"
      9 #include "base/logging.h"
     10 #include "base/memory/ref_counted.h"
     11 #include "base/posix/eintr_wrapper.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "tools/android/forwarder2/socket.h"
     14 
     15 namespace forwarder2 {
     16 namespace {
     17 
     18 // Helper class to buffer reads and writes from one socket to another.
     19 class BufferedCopier {
     20  public:
     21   // Does NOT own the pointers.
     22   BufferedCopier(Socket* socket_from,
     23                  Socket* socket_to)
     24       : socket_from_(socket_from),
     25         socket_to_(socket_to),
     26         bytes_read_(0),
     27         write_offset_(0) {
     28   }
     29 
     30   bool AddToReadSet(fd_set* read_fds) {
     31     if (bytes_read_ == 0)
     32       return socket_from_->AddFdToSet(read_fds);
     33     return false;
     34   }
     35 
     36   bool AddToWriteSet(fd_set* write_fds) {
     37     if (write_offset_ < bytes_read_)
     38       return socket_to_->AddFdToSet(write_fds);
     39     return false;
     40   }
     41 
     42   bool TryRead(const fd_set& read_fds) {
     43     if (!socket_from_->IsFdInSet(read_fds))
     44       return false;
     45     if (bytes_read_ != 0)  // Can't read.
     46       return false;
     47     int ret = socket_from_->Read(buffer_, kBufferSize);
     48     if (ret > 0) {
     49       bytes_read_ = ret;
     50       return true;
     51     }
     52     return false;
     53   }
     54 
     55   bool TryWrite(const fd_set& write_fds) {
     56     if (!socket_to_->IsFdInSet(write_fds))
     57       return false;
     58     if (write_offset_ >= bytes_read_)  // Nothing to write.
     59       return false;
     60     int ret = socket_to_->Write(buffer_ + write_offset_,
     61                                 bytes_read_ - write_offset_);
     62     if (ret > 0) {
     63       write_offset_ += ret;
     64       if (write_offset_ == bytes_read_) {
     65         write_offset_ = 0;
     66         bytes_read_ = 0;
     67       }
     68       return true;
     69     }
     70     return false;
     71   }
     72 
     73  private:
     74   // Not owned.
     75   Socket* socket_from_;
     76   Socket* socket_to_;
     77 
     78   // A big buffer to let our file-over-http bridge work more like real file.
     79   static const int kBufferSize = 1024 * 128;
     80   int bytes_read_;
     81   int write_offset_;
     82   char buffer_[kBufferSize];
     83 
     84   DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
     85 };
     86 
     87 // Internal class that wraps a helper thread to forward traffic between
     88 // |socket1| and |socket2|. After creating a new instance, call its Start()
     89 // method to launch operations. Thread stops automatically if one of the socket
     90 // disconnects, but ensures that all buffered writes to the other, still alive,
     91 // socket, are written first. When this happens, the instance will delete itself
     92 // automatically.
     93 // Note that the instance will always be destroyed on the same thread that
     94 // created it.
     95 class Forwarder {
     96  public:
     97   Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
     98       : socket1_(socket1.Pass()),
     99         socket2_(socket2.Pass()),
    100         destructor_runner_(base::MessageLoopProxy::current()),
    101         thread_("ForwarderThread") {
    102   }
    103 
    104   void Start() {
    105     thread_.Start();
    106     thread_.message_loop_proxy()->PostTask(
    107         FROM_HERE,
    108         base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
    109   }
    110 
    111  private:
    112   void ThreadHandler() {
    113     const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
    114     fd_set read_fds;
    115     fd_set write_fds;
    116 
    117     // Copy from socket1 to socket2
    118     BufferedCopier buffer1(socket1_.get(), socket2_.get());
    119     // Copy from socket2 to socket1
    120     BufferedCopier buffer2(socket2_.get(), socket1_.get());
    121 
    122     bool run = true;
    123     while (run) {
    124       FD_ZERO(&read_fds);
    125       FD_ZERO(&write_fds);
    126 
    127       buffer1.AddToReadSet(&read_fds);
    128       buffer2.AddToReadSet(&read_fds);
    129       buffer1.AddToWriteSet(&write_fds);
    130       buffer2.AddToWriteSet(&write_fds);
    131 
    132       if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
    133         PLOG(ERROR) << "select";
    134         break;
    135       }
    136       // When a socket in the read set closes the connection, select() returns
    137       // with that socket descriptor set as "ready to read".  When we call
    138       // TryRead() below, it will return false, but the while loop will continue
    139       // to run until all the write operations are finished, to make sure the
    140       // buffers are completely flushed out.
    141 
    142       // Keep running while we have some operation to do.
    143       run = buffer1.TryRead(read_fds);
    144       run = run || buffer2.TryRead(read_fds);
    145       run = run || buffer1.TryWrite(write_fds);
    146       run = run || buffer2.TryWrite(write_fds);
    147     }
    148 
    149     // Note that the thread that |destruction_runner_| runs tasks on could be
    150     // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
    151     // the sockets now rather than relying on the destructor.
    152     socket1_.reset();
    153     socket2_.reset();
    154 
    155     // Note that base::Thread must be destroyed on the thread it was created on.
    156     destructor_runner_->DeleteSoon(FROM_HERE, this);
    157   }
    158 
    159   scoped_ptr<Socket> socket1_;
    160   scoped_ptr<Socket> socket2_;
    161   scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
    162   base::Thread thread_;
    163 };
    164 
    165 }  // namespace
    166 
    167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) {
    168   (new Forwarder(socket1.Pass(), socket2.Pass()))->Start();
    169 }
    170 
    171 }  // namespace forwarder2
    172