Home | History | Annotate | Download | only in forwarder2
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "tools/android/forwarder2/forwarders_manager.h"
      6 
      7 #include <sys/select.h>
      8 #include <unistd.h>
      9 
     10 #include <algorithm>
     11 
     12 #include "base/basictypes.h"
     13 #include "base/bind.h"
     14 #include "base/callback_helpers.h"
     15 #include "base/location.h"
     16 #include "base/logging.h"
     17 #include "base/message_loop/message_loop_proxy.h"
     18 #include "base/posix/eintr_wrapper.h"
     19 #include "tools/android/forwarder2/forwarder.h"
     20 #include "tools/android/forwarder2/socket.h"
     21 
     22 namespace forwarder2 {
     23 
     24 ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
     25   thread_.Start();
     26   WaitForEventsOnInternalThreadSoon();
     27 }
     28 
     29 
     30 ForwardersManager::~ForwardersManager() {
     31   deletion_notifier_.Notify();
     32 }
     33 
     34 void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
     35                                                    scoped_ptr<Socket> socket2) {
     36   // Note that the internal Forwarder vector is populated on the internal thread
     37   // which is the only thread from which it's accessed.
     38   thread_.message_loop_proxy()->PostTask(
     39       FROM_HERE,
     40       base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
     41                  base::Unretained(this), base::Passed(&socket1),
     42                  base::Passed(&socket2)));
     43 
     44   // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
     45   // the internal thread gets executed immediately.
     46   wakeup_notifier_.Notify();
     47 }
     48 
     49 void ForwardersManager::CreateNewForwarderOnInternalThread(
     50     scoped_ptr<Socket> socket1,
     51     scoped_ptr<Socket> socket2) {
     52   DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
     53   forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
     54 }
     55 
     56 void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
     57   thread_.message_loop_proxy()->PostTask(
     58       FROM_HERE,
     59       base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
     60                  base::Unretained(this)));
     61 }
     62 
     63 void ForwardersManager::WaitForEventsOnInternalThread() {
     64   DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
     65   fd_set read_fds;
     66   fd_set write_fds;
     67 
     68   FD_ZERO(&read_fds);
     69   FD_ZERO(&write_fds);
     70 
     71   // Populate the file descriptor sets.
     72   int max_fd = -1;
     73   for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
     74        it != forwarders_.end(); ++it) {
     75     Forwarder* const forwarder = *it;
     76     forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
     77   }
     78 
     79   const int notifier_fds[] = {
     80     wakeup_notifier_.receiver_fd(),
     81     deletion_notifier_.receiver_fd(),
     82   };
     83 
     84   for (int i = 0; i < arraysize(notifier_fds); ++i) {
     85     const int notifier_fd = notifier_fds[i];
     86     DCHECK_GT(notifier_fd, -1);
     87     FD_SET(notifier_fd, &read_fds);
     88     max_fd = std::max(max_fd, notifier_fd);
     89   }
     90 
     91   const int ret = HANDLE_EINTR(
     92       select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
     93   if (ret < 0) {
     94     PLOG(ERROR) << "select";
     95     return;
     96   }
     97 
     98   const bool must_shutdown = FD_ISSET(
     99       deletion_notifier_.receiver_fd(), &read_fds);
    100   if (must_shutdown && forwarders_.empty())
    101     return;
    102 
    103   base::ScopedClosureRunner wait_for_events_soon(
    104       base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon,
    105                  base::Unretained(this)));
    106 
    107   if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
    108     // Note that the events on FDs other than the wakeup notifier one, if any,
    109     // will be processed upon the next select().
    110     wakeup_notifier_.Reset();
    111     return;
    112   }
    113 
    114   // Notify the Forwarder instances and remove the ones that are closed.
    115   for (size_t i = 0; i < forwarders_.size(); ) {
    116     Forwarder* const forwarder = forwarders_[i];
    117     forwarder->ProcessEvents(read_fds, write_fds);
    118 
    119     if (must_shutdown)
    120       forwarder->Shutdown();
    121 
    122     if (!forwarder->IsClosed()) {
    123       ++i;
    124       continue;
    125     }
    126 
    127     std::swap(forwarders_[i], forwarders_.back());
    128     forwarders_.pop_back();
    129   }
    130 }
    131 
    132 }  // namespace forwarder2
    133