1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "tools/android/forwarder2/forwarder.h" 6 7 #include "base/basictypes.h" 8 #include "base/bind.h" 9 #include "base/logging.h" 10 #include "base/memory/ref_counted.h" 11 #include "base/posix/eintr_wrapper.h" 12 #include "base/single_thread_task_runner.h" 13 #include "tools/android/forwarder2/socket.h" 14 15 namespace forwarder2 { 16 namespace { 17 18 // Helper class to buffer reads and writes from one socket to another. 19 class BufferedCopier { 20 public: 21 // Does NOT own the pointers. 22 BufferedCopier(Socket* socket_from, 23 Socket* socket_to) 24 : socket_from_(socket_from), 25 socket_to_(socket_to), 26 bytes_read_(0), 27 write_offset_(0) { 28 } 29 30 bool AddToReadSet(fd_set* read_fds) { 31 if (bytes_read_ == 0) 32 return socket_from_->AddFdToSet(read_fds); 33 return false; 34 } 35 36 bool AddToWriteSet(fd_set* write_fds) { 37 if (write_offset_ < bytes_read_) 38 return socket_to_->AddFdToSet(write_fds); 39 return false; 40 } 41 42 bool TryRead(const fd_set& read_fds) { 43 if (!socket_from_->IsFdInSet(read_fds)) 44 return false; 45 if (bytes_read_ != 0) // Can't read. 46 return false; 47 int ret = socket_from_->Read(buffer_, kBufferSize); 48 if (ret > 0) { 49 bytes_read_ = ret; 50 return true; 51 } 52 return false; 53 } 54 55 bool TryWrite(const fd_set& write_fds) { 56 if (!socket_to_->IsFdInSet(write_fds)) 57 return false; 58 if (write_offset_ >= bytes_read_) // Nothing to write. 59 return false; 60 int ret = socket_to_->Write(buffer_ + write_offset_, 61 bytes_read_ - write_offset_); 62 if (ret > 0) { 63 write_offset_ += ret; 64 if (write_offset_ == bytes_read_) { 65 write_offset_ = 0; 66 bytes_read_ = 0; 67 } 68 return true; 69 } 70 return false; 71 } 72 73 private: 74 // Not owned. 75 Socket* socket_from_; 76 Socket* socket_to_; 77 78 // A big buffer to let our file-over-http bridge work more like real file. 79 static const int kBufferSize = 1024 * 128; 80 int bytes_read_; 81 int write_offset_; 82 char buffer_[kBufferSize]; 83 84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 85 }; 86 87 // Internal class that wraps a helper thread to forward traffic between 88 // |socket1| and |socket2|. After creating a new instance, call its Start() 89 // method to launch operations. Thread stops automatically if one of the socket 90 // disconnects, but ensures that all buffered writes to the other, still alive, 91 // socket, are written first. When this happens, the instance will delete itself 92 // automatically. 93 // Note that the instance will always be destroyed on the same thread that 94 // created it. 95 class Forwarder { 96 public: 97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) 98 : socket1_(socket1.Pass()), 99 socket2_(socket2.Pass()), 100 destructor_runner_(base::MessageLoopProxy::current()), 101 thread_("ForwarderThread") { 102 } 103 104 void Start() { 105 thread_.Start(); 106 thread_.message_loop_proxy()->PostTask( 107 FROM_HERE, 108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); 109 } 110 111 private: 112 void ThreadHandler() { 113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; 114 fd_set read_fds; 115 fd_set write_fds; 116 117 // Copy from socket1 to socket2 118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); 119 // Copy from socket2 to socket1 120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); 121 122 bool run = true; 123 while (run) { 124 FD_ZERO(&read_fds); 125 FD_ZERO(&write_fds); 126 127 buffer1.AddToReadSet(&read_fds); 128 buffer2.AddToReadSet(&read_fds); 129 buffer1.AddToWriteSet(&write_fds); 130 buffer2.AddToWriteSet(&write_fds); 131 132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { 133 PLOG(ERROR) << "select"; 134 break; 135 } 136 // When a socket in the read set closes the connection, select() returns 137 // with that socket descriptor set as "ready to read". When we call 138 // TryRead() below, it will return false, but the while loop will continue 139 // to run until all the write operations are finished, to make sure the 140 // buffers are completely flushed out. 141 142 // Keep running while we have some operation to do. 143 run = buffer1.TryRead(read_fds); 144 run = run || buffer2.TryRead(read_fds); 145 run = run || buffer1.TryWrite(write_fds); 146 run = run || buffer2.TryWrite(write_fds); 147 } 148 149 // Note that the thread that |destruction_runner_| runs tasks on could be 150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close 151 // the sockets now rather than relying on the destructor. 152 socket1_.reset(); 153 socket2_.reset(); 154 155 // Note that base::Thread must be destroyed on the thread it was created on. 156 destructor_runner_->DeleteSoon(FROM_HERE, this); 157 } 158 159 scoped_ptr<Socket> socket1_; 160 scoped_ptr<Socket> socket2_; 161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; 162 base::Thread thread_; 163 }; 164 165 } // namespace 166 167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { 168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); 169 } 170 171 } // namespace forwarder2 172