1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "tools/android/forwarder2/forwarders_manager.h" 6 7 #include <sys/select.h> 8 #include <unistd.h> 9 10 #include <algorithm> 11 12 #include "base/basictypes.h" 13 #include "base/bind.h" 14 #include "base/callback_helpers.h" 15 #include "base/location.h" 16 #include "base/logging.h" 17 #include "base/message_loop/message_loop_proxy.h" 18 #include "base/posix/eintr_wrapper.h" 19 #include "tools/android/forwarder2/forwarder.h" 20 #include "tools/android/forwarder2/socket.h" 21 22 namespace forwarder2 { 23 24 ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") { 25 thread_.Start(); 26 WaitForEventsOnInternalThreadSoon(); 27 } 28 29 30 ForwardersManager::~ForwardersManager() { 31 deletion_notifier_.Notify(); 32 } 33 34 void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1, 35 scoped_ptr<Socket> socket2) { 36 // Note that the internal Forwarder vector is populated on the internal thread 37 // which is the only thread from which it's accessed. 38 thread_.message_loop_proxy()->PostTask( 39 FROM_HERE, 40 base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread, 41 base::Unretained(this), base::Passed(&socket1), 42 base::Passed(&socket2))); 43 44 // Guarantees that the CreateNewForwarderOnInternalThread callback posted to 45 // the internal thread gets executed immediately. 46 wakeup_notifier_.Notify(); 47 } 48 49 void ForwardersManager::CreateNewForwarderOnInternalThread( 50 scoped_ptr<Socket> socket1, 51 scoped_ptr<Socket> socket2) { 52 DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread()); 53 forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass())); 54 } 55 56 void ForwardersManager::WaitForEventsOnInternalThreadSoon() { 57 thread_.message_loop_proxy()->PostTask( 58 FROM_HERE, 59 base::Bind(&ForwardersManager::WaitForEventsOnInternalThread, 60 base::Unretained(this))); 61 } 62 63 void ForwardersManager::WaitForEventsOnInternalThread() { 64 DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread()); 65 fd_set read_fds; 66 fd_set write_fds; 67 68 FD_ZERO(&read_fds); 69 FD_ZERO(&write_fds); 70 71 // Populate the file descriptor sets. 72 int max_fd = -1; 73 for (ScopedVector<Forwarder>::iterator it = forwarders_.begin(); 74 it != forwarders_.end(); ++it) { 75 Forwarder* const forwarder = *it; 76 forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd); 77 } 78 79 const int notifier_fds[] = { 80 wakeup_notifier_.receiver_fd(), 81 deletion_notifier_.receiver_fd(), 82 }; 83 84 for (int i = 0; i < arraysize(notifier_fds); ++i) { 85 const int notifier_fd = notifier_fds[i]; 86 DCHECK_GT(notifier_fd, -1); 87 FD_SET(notifier_fd, &read_fds); 88 max_fd = std::max(max_fd, notifier_fd); 89 } 90 91 const int ret = HANDLE_EINTR( 92 select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)); 93 if (ret < 0) { 94 PLOG(ERROR) << "select"; 95 return; 96 } 97 98 const bool must_shutdown = FD_ISSET( 99 deletion_notifier_.receiver_fd(), &read_fds); 100 if (must_shutdown && forwarders_.empty()) 101 return; 102 103 base::ScopedClosureRunner wait_for_events_soon( 104 base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon, 105 base::Unretained(this))); 106 107 if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) { 108 // Note that the events on FDs other than the wakeup notifier one, if any, 109 // will be processed upon the next select(). 110 wakeup_notifier_.Reset(); 111 return; 112 } 113 114 // Notify the Forwarder instances and remove the ones that are closed. 115 for (size_t i = 0; i < forwarders_.size(); ) { 116 Forwarder* const forwarder = forwarders_[i]; 117 forwarder->ProcessEvents(read_fds, write_fds); 118 119 if (must_shutdown) 120 forwarder->Shutdown(); 121 122 if (!forwarder->IsClosed()) { 123 ++i; 124 continue; 125 } 126 127 std::swap(forwarders_[i], forwarders_.back()); 128 forwarders_.pop_back(); 129 } 130 } 131 132 } // namespace forwarder2 133