1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "tools/android/forwarder2/forwarder.h" 6 7 #include "base/basictypes.h" 8 #include "base/logging.h" 9 #include "base/posix/eintr_wrapper.h" 10 #include "tools/android/forwarder2/socket.h" 11 12 namespace forwarder2 { 13 namespace { 14 15 const int kBufferSize = 32 * 1024; 16 17 } // namespace 18 19 20 // Helper class to buffer reads and writes from one socket to another. 21 // Each implements a small buffer connected two one input socket, and 22 // one output socket. 23 // 24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ 25 // 26 // These objects are used in a pair to handle duplex traffic, as in: 27 // 28 // ------> [BufferedCopier_1] ---> 29 // / \ 30 // socket_1 * * socket_2 31 // \ / 32 // <------ [BufferedCopier_2] <---- 33 // 34 // When a BufferedCopier is in the READING state (see below), it only listens 35 // to events on its input socket, and won't detect when its output socket 36 // disconnects. To work around this, its peer will call its Close() method 37 // when that happens. 38 39 class Forwarder::BufferedCopier { 40 public: 41 // Possible states: 42 // READING - Empty buffer and Waiting for input. 43 // WRITING - Data in buffer, and waiting for output. 44 // CLOSING - Like WRITING, but do not try to read after that. 45 // CLOSED - Completely closed. 46 // 47 // State transitions are: 48 // 49 // T01: READING ---[receive data]---> WRITING 50 // T02: READING ---[error on input socket]---> CLOSED 51 // T03: READING ---[Close() call]---> CLOSED 52 // 53 // T04: WRITING ---[write partial data]---> WRITING 54 // T05: WRITING ---[write all data]----> READING 55 // T06: WRITING ---[error on output socket]----> CLOSED 56 // T07: WRITING ---[Close() call]---> CLOSING 57 // 58 // T08: CLOSING ---[write partial data]---> CLOSING 59 // T09: CLOSING ---[write all data]----> CLOSED 60 // T10: CLOSING ---[Close() call]---> CLOSING 61 // T11: CLOSING ---[error on output socket] ---> CLOSED 62 // 63 enum State { 64 STATE_READING = 0, 65 STATE_WRITING = 1, 66 STATE_CLOSING = 2, 67 STATE_CLOSED = 3, 68 }; 69 70 // Does NOT own the pointers. 71 BufferedCopier(Socket* socket_from, Socket* socket_to) 72 : socket_from_(socket_from), 73 socket_to_(socket_to), 74 bytes_read_(0), 75 write_offset_(0), 76 peer_(NULL), 77 state_(STATE_READING) {} 78 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. 80 void SetPeer(BufferedCopier* peer) { 81 DCHECK(!peer_); 82 peer_ = peer; 83 } 84 85 bool is_closed() const { return state_ == STATE_CLOSED; } 86 87 // Gently asks to close a buffer. Called either by the peer or the forwarder. 88 void Close() { 89 switch (state_) { 90 case STATE_READING: 91 state_ = STATE_CLOSED; // T03 92 break; 93 case STATE_WRITING: 94 state_ = STATE_CLOSING; // T07 95 break; 96 case STATE_CLOSING: 97 break; // T10 98 case STATE_CLOSED: 99 ; 100 } 101 } 102 103 // Call this before select(). This updates |read_fds|, 104 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed. 105 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) { 106 int fd; 107 switch (state_) { 108 case STATE_READING: 109 DCHECK(bytes_read_ == 0); 110 DCHECK(write_offset_ == 0); 111 fd = socket_from_->fd(); 112 if (fd < 0) { 113 ForceClose(); // T02 114 return; 115 } 116 FD_SET(fd, read_fds); 117 break; 118 119 case STATE_WRITING: 120 case STATE_CLOSING: 121 DCHECK(bytes_read_ > 0); 122 DCHECK(write_offset_ < bytes_read_); 123 fd = socket_to_->fd(); 124 if (fd < 0) { 125 ForceClose(); // T06 126 return; 127 } 128 FD_SET(fd, write_fds); 129 break; 130 131 case STATE_CLOSED: 132 return; 133 } 134 *max_fd = std::max(*max_fd, fd); 135 } 136 137 // Call this after a select() call to operate over the buffer. 138 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) { 139 int fd, ret; 140 switch (state_) { 141 case STATE_READING: 142 fd = socket_from_->fd(); 143 if (fd < 0) { 144 state_ = STATE_CLOSED; // T02 145 return; 146 } 147 if (!FD_ISSET(fd, &read_fds)) 148 return; 149 150 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); 151 if (ret <= 0) { 152 ForceClose(); // T02 153 return; 154 } 155 bytes_read_ = ret; 156 write_offset_ = 0; 157 state_ = STATE_WRITING; // T01 158 break; 159 160 case STATE_WRITING: 161 case STATE_CLOSING: 162 fd = socket_to_->fd(); 163 if (fd < 0) { 164 ForceClose(); // T06 + T11 165 return; 166 } 167 if (!FD_ISSET(fd, &write_fds)) 168 return; 169 170 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_, 171 bytes_read_ - write_offset_); 172 if (ret <= 0) { 173 ForceClose(); // T06 + T11 174 return; 175 } 176 177 write_offset_ += ret; 178 if (write_offset_ < bytes_read_) 179 return; // T08 + T04 180 181 write_offset_ = 0; 182 bytes_read_ = 0; 183 if (state_ == STATE_CLOSING) { 184 ForceClose(); // T09 185 return; 186 } 187 state_ = STATE_READING; // T05 188 break; 189 190 case STATE_CLOSED: 191 ; 192 } 193 } 194 195 private: 196 // Internal method used to close the buffer and notify the peer, if any. 197 void ForceClose() { 198 if (peer_) { 199 peer_->Close(); 200 peer_ = NULL; 201 } 202 state_ = STATE_CLOSED; 203 } 204 205 // Not owned. 206 Socket* socket_from_; 207 Socket* socket_to_; 208 209 int bytes_read_; 210 int write_offset_; 211 BufferedCopier* peer_; 212 State state_; 213 char buffer_[kBufferSize]; 214 215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 216 }; 217 218 Forwarder::Forwarder(scoped_ptr<Socket> socket1, 219 scoped_ptr<Socket> socket2) 220 : socket1_(socket1.Pass()), 221 socket2_(socket2.Pass()), 222 buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())), 223 buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) { 224 buffer1_->SetPeer(buffer2_.get()); 225 buffer2_->SetPeer(buffer1_.get()); 226 } 227 228 Forwarder::~Forwarder() { 229 DCHECK(thread_checker_.CalledOnValidThread()); 230 } 231 232 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) { 233 DCHECK(thread_checker_.CalledOnValidThread()); 234 buffer1_->PrepareSelect(read_fds, write_fds, max_fd); 235 buffer2_->PrepareSelect(read_fds, write_fds, max_fd); 236 } 237 238 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) { 239 DCHECK(thread_checker_.CalledOnValidThread()); 240 buffer1_->ProcessSelect(read_fds, write_fds); 241 buffer2_->ProcessSelect(read_fds, write_fds); 242 } 243 244 bool Forwarder::IsClosed() const { 245 DCHECK(thread_checker_.CalledOnValidThread()); 246 return buffer1_->is_closed() && buffer2_->is_closed(); 247 } 248 249 void Forwarder::Shutdown() { 250 DCHECK(thread_checker_.CalledOnValidThread()); 251 buffer1_->Close(); 252 buffer2_->Close(); 253 } 254 255 } // namespace forwarder2 256