Home | History | Annotate | Download | only in forwarder2
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "tools/android/forwarder2/forwarder.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/logging.h"
      9 #include "base/posix/eintr_wrapper.h"
     10 #include "tools/android/forwarder2/socket.h"
     11 
     12 namespace forwarder2 {
     13 namespace {
     14 
     15 const int kBufferSize = 32 * 1024;
     16 
     17 }  // namespace
     18 
     19 
     20 // Helper class to buffer reads and writes from one socket to another.
     21 // Each implements a small buffer connected two one input socket, and
     22 // one output socket.
     23 //
     24 //   socket_from_ ---> [BufferedCopier] ---> socket_to_
     25 //
     26 // These objects are used in a pair to handle duplex traffic, as in:
     27 //
     28 //                    ------> [BufferedCopier_1] --->
     29 //                  /                                \
     30 //      socket_1   *                                  * socket_2
     31 //                  \                                /
     32 //                   <------ [BufferedCopier_2] <----
     33 //
     34 // When a BufferedCopier is in the READING state (see below), it only listens
     35 // to events on its input socket, and won't detect when its output socket
     36 // disconnects. To work around this, its peer will call its Close() method
     37 // when that happens.
     38 
     39 class Forwarder::BufferedCopier {
     40  public:
     41   // Possible states:
     42   //    READING - Empty buffer and Waiting for input.
     43   //    WRITING - Data in buffer, and waiting for output.
     44   //    CLOSING - Like WRITING, but do not try to read after that.
     45   //    CLOSED  - Completely closed.
     46   //
     47   // State transitions are:
     48   //
     49   //   T01:  READING ---[receive data]---> WRITING
     50   //   T02:  READING ---[error on input socket]---> CLOSED
     51   //   T03:  READING ---[Close() call]---> CLOSED
     52   //
     53   //   T04:  WRITING ---[write partial data]---> WRITING
     54   //   T05:  WRITING ---[write all data]----> READING
     55   //   T06:  WRITING ---[error on output socket]----> CLOSED
     56   //   T07:  WRITING ---[Close() call]---> CLOSING
     57   //
     58   //   T08:  CLOSING ---[write partial data]---> CLOSING
     59   //   T09:  CLOSING ---[write all data]----> CLOSED
     60   //   T10:  CLOSING ---[Close() call]---> CLOSING
     61   //   T11:  CLOSING ---[error on output socket] ---> CLOSED
     62   //
     63   enum State {
     64     STATE_READING = 0,
     65     STATE_WRITING = 1,
     66     STATE_CLOSING = 2,
     67     STATE_CLOSED = 3,
     68   };
     69 
     70   // Does NOT own the pointers.
     71   BufferedCopier(Socket* socket_from, Socket* socket_to)
     72       : socket_from_(socket_from),
     73         socket_to_(socket_to),
     74         bytes_read_(0),
     75         write_offset_(0),
     76         peer_(NULL),
     77         state_(STATE_READING) {}
     78 
     79   // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
     80   void SetPeer(BufferedCopier* peer) {
     81     DCHECK(!peer_);
     82     peer_ = peer;
     83   }
     84 
     85   bool is_closed() const { return state_ == STATE_CLOSED; }
     86 
     87   // Gently asks to close a buffer. Called either by the peer or the forwarder.
     88   void Close() {
     89     switch (state_) {
     90       case STATE_READING:
     91         state_ = STATE_CLOSED;  // T03
     92         break;
     93       case STATE_WRITING:
     94         state_ = STATE_CLOSING;  // T07
     95         break;
     96       case STATE_CLOSING:
     97         break;  // T10
     98       case STATE_CLOSED:
     99         ;
    100     }
    101   }
    102 
    103   // Call this before select(). This updates |read_fds|,
    104   // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
    105   void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
    106     int fd;
    107     switch (state_) {
    108       case STATE_READING:
    109         DCHECK(bytes_read_ == 0);
    110         DCHECK(write_offset_ == 0);
    111         fd = socket_from_->fd();
    112         if (fd < 0) {
    113           ForceClose();  // T02
    114           return;
    115         }
    116         FD_SET(fd, read_fds);
    117         break;
    118 
    119       case STATE_WRITING:
    120       case STATE_CLOSING:
    121         DCHECK(bytes_read_ > 0);
    122         DCHECK(write_offset_ < bytes_read_);
    123         fd = socket_to_->fd();
    124         if (fd < 0) {
    125           ForceClose();  // T06
    126           return;
    127         }
    128         FD_SET(fd, write_fds);
    129         break;
    130 
    131       case STATE_CLOSED:
    132         return;
    133     }
    134     *max_fd = std::max(*max_fd, fd);
    135   }
    136 
    137   // Call this after a select() call to operate over the buffer.
    138   void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
    139     int fd, ret;
    140     switch (state_) {
    141       case STATE_READING:
    142         fd = socket_from_->fd();
    143         if (fd < 0) {
    144           state_ = STATE_CLOSED;  // T02
    145           return;
    146         }
    147         if (!FD_ISSET(fd, &read_fds))
    148           return;
    149 
    150         ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
    151         if (ret <= 0) {
    152           ForceClose();  // T02
    153           return;
    154         }
    155         bytes_read_ = ret;
    156         write_offset_ = 0;
    157         state_ = STATE_WRITING;  // T01
    158         break;
    159 
    160       case STATE_WRITING:
    161       case STATE_CLOSING:
    162         fd = socket_to_->fd();
    163         if (fd < 0) {
    164           ForceClose();  // T06 + T11
    165           return;
    166         }
    167         if (!FD_ISSET(fd, &write_fds))
    168           return;
    169 
    170         ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
    171                                            bytes_read_ - write_offset_);
    172         if (ret <= 0) {
    173           ForceClose();  // T06 + T11
    174           return;
    175         }
    176 
    177         write_offset_ += ret;
    178         if (write_offset_ < bytes_read_)
    179           return;  // T08 + T04
    180 
    181         write_offset_ = 0;
    182         bytes_read_ = 0;
    183         if (state_ == STATE_CLOSING) {
    184           ForceClose();  // T09
    185           return;
    186         }
    187         state_ = STATE_READING;  // T05
    188         break;
    189 
    190       case STATE_CLOSED:
    191         ;
    192     }
    193   }
    194 
    195  private:
    196   // Internal method used to close the buffer and notify the peer, if any.
    197   void ForceClose() {
    198     if (peer_) {
    199       peer_->Close();
    200       peer_ = NULL;
    201     }
    202     state_ = STATE_CLOSED;
    203   }
    204 
    205   // Not owned.
    206   Socket* socket_from_;
    207   Socket* socket_to_;
    208 
    209   int bytes_read_;
    210   int write_offset_;
    211   BufferedCopier* peer_;
    212   State state_;
    213   char buffer_[kBufferSize];
    214 
    215   DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
    216 };
    217 
    218 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
    219                      scoped_ptr<Socket> socket2)
    220     : socket1_(socket1.Pass()),
    221       socket2_(socket2.Pass()),
    222       buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
    223       buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
    224   buffer1_->SetPeer(buffer2_.get());
    225   buffer2_->SetPeer(buffer1_.get());
    226 }
    227 
    228 Forwarder::~Forwarder() {
    229   DCHECK(thread_checker_.CalledOnValidThread());
    230 }
    231 
    232 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
    233   DCHECK(thread_checker_.CalledOnValidThread());
    234   buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
    235   buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
    236 }
    237 
    238 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
    239   DCHECK(thread_checker_.CalledOnValidThread());
    240   buffer1_->ProcessSelect(read_fds, write_fds);
    241   buffer2_->ProcessSelect(read_fds, write_fds);
    242 }
    243 
    244 bool Forwarder::IsClosed() const {
    245   DCHECK(thread_checker_.CalledOnValidThread());
    246   return buffer1_->is_closed() && buffer2_->is_closed();
    247 }
    248 
    249 void Forwarder::Shutdown() {
    250   DCHECK(thread_checker_.CalledOnValidThread());
    251   buffer1_->Close();
    252   buffer2_->Close();
    253 }
    254 
    255 }  // namespace forwarder2
    256