1 /* http://frotznet.googlecode.com/svn/trunk/utils/fdevent.c 2 ** 3 ** Copyright 2006, Brian Swetland <swetland (at) frotz.net> 4 ** 5 ** Licensed under the Apache License, Version 2.0 (the "License"); 6 ** you may not use this file except in compliance with the License. 7 ** You may obtain a copy of the License at 8 ** 9 ** http://www.apache.org/licenses/LICENSE-2.0 10 ** 11 ** Unless required by applicable law or agreed to in writing, software 12 ** distributed under the License is distributed on an "AS IS" BASIS, 13 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 ** See the License for the specific language governing permissions and 15 ** limitations under the License. 16 */ 17 18 #define TRACE_TAG FDEVENT 19 20 #include "sysdeps.h" 21 #include "fdevent.h" 22 23 #include <fcntl.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <unistd.h> 27 28 #include <atomic> 29 #include <functional> 30 #include <list> 31 #include <mutex> 32 #include <unordered_map> 33 #include <vector> 34 35 #include <android-base/logging.h> 36 #include <android-base/stringprintf.h> 37 #include <android-base/thread_annotations.h> 38 39 #include "adb_io.h" 40 #include "adb_trace.h" 41 #include "adb_unique_fd.h" 42 #include "adb_utils.h" 43 44 #if !ADB_HOST 45 // This socket is used when a subproc shell service exists. 46 // It wakes up the fdevent_loop() and cause the correct handling 47 // of the shell's pseudo-tty master. I.e. force close it. 48 int SHELL_EXIT_NOTIFY_FD = -1; 49 #endif // !ADB_HOST 50 51 #define FDE_EVENTMASK 0x00ff 52 #define FDE_STATEMASK 0xff00 53 54 #define FDE_ACTIVE 0x0100 55 #define FDE_PENDING 0x0200 56 #define FDE_CREATED 0x0400 57 58 struct PollNode { 59 fdevent* fde; 60 adb_pollfd pollfd; 61 62 explicit PollNode(fdevent* fde) : fde(fde) { 63 memset(&pollfd, 0, sizeof(pollfd)); 64 pollfd.fd = fde->fd; 65 66 #if defined(__linux__) 67 // Always enable POLLRDHUP, so the host server can take action when some clients disconnect. 68 // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034. 69 pollfd.events = POLLRDHUP; 70 #endif 71 } 72 }; 73 74 // All operations to fdevent should happen only in the main thread. 75 // That's why we don't need a lock for fdevent. 76 static auto& g_poll_node_map = *new std::unordered_map<int, PollNode>(); 77 static auto& g_pending_list = *new std::list<fdevent*>(); 78 static std::atomic<bool> terminate_loop(false); 79 static bool main_thread_valid; 80 static unsigned long main_thread_id; 81 82 static auto& run_queue_notify_fd = *new unique_fd(); 83 static auto& run_queue_mutex = *new std::mutex(); 84 static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector<std::function<void()>>(); 85 86 void check_main_thread() { 87 if (main_thread_valid) { 88 CHECK_EQ(main_thread_id, adb_thread_id()); 89 } 90 } 91 92 void set_main_thread() { 93 main_thread_valid = true; 94 main_thread_id = adb_thread_id(); 95 } 96 97 static std::string dump_fde(const fdevent* fde) { 98 std::string state; 99 if (fde->state & FDE_ACTIVE) { 100 state += "A"; 101 } 102 if (fde->state & FDE_PENDING) { 103 state += "P"; 104 } 105 if (fde->state & FDE_CREATED) { 106 state += "C"; 107 } 108 if (fde->state & FDE_READ) { 109 state += "R"; 110 } 111 if (fde->state & FDE_WRITE) { 112 state += "W"; 113 } 114 if (fde->state & FDE_ERROR) { 115 state += "E"; 116 } 117 if (fde->state & FDE_DONT_CLOSE) { 118 state += "D"; 119 } 120 return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str()); 121 } 122 123 fdevent* fdevent_create(int fd, fd_func func, void* arg) { 124 check_main_thread(); 125 fdevent *fde = (fdevent*) malloc(sizeof(fdevent)); 126 if(fde == 0) return 0; 127 fdevent_install(fde, fd, func, arg); 128 fde->state |= FDE_CREATED; 129 return fde; 130 } 131 132 void fdevent_destroy(fdevent* fde) { 133 check_main_thread(); 134 if(fde == 0) return; 135 if(!(fde->state & FDE_CREATED)) { 136 LOG(FATAL) << "destroying fde not created by fdevent_create(): " << dump_fde(fde); 137 } 138 fdevent_remove(fde); 139 free(fde); 140 } 141 142 void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) { 143 check_main_thread(); 144 CHECK_GE(fd, 0); 145 memset(fde, 0, sizeof(fdevent)); 146 fde->state = FDE_ACTIVE; 147 fde->fd = fd; 148 fde->func = func; 149 fde->arg = arg; 150 if (!set_file_block_mode(fd, false)) { 151 // Here is not proper to handle the error. If it fails here, some error is 152 // likely to be detected by poll(), then we can let the callback function 153 // to handle it. 154 LOG(ERROR) << "failed to set non-blocking mode for fd " << fd; 155 } 156 auto pair = g_poll_node_map.emplace(fde->fd, PollNode(fde)); 157 CHECK(pair.second) << "install existing fd " << fd; 158 D("fdevent_install %s", dump_fde(fde).c_str()); 159 } 160 161 void fdevent_remove(fdevent* fde) { 162 check_main_thread(); 163 D("fdevent_remove %s", dump_fde(fde).c_str()); 164 if (fde->state & FDE_ACTIVE) { 165 g_poll_node_map.erase(fde->fd); 166 if (fde->state & FDE_PENDING) { 167 g_pending_list.remove(fde); 168 } 169 if (!(fde->state & FDE_DONT_CLOSE)) { 170 adb_close(fde->fd); 171 fde->fd = -1; 172 } 173 fde->state = 0; 174 fde->events = 0; 175 } 176 } 177 178 static void fdevent_update(fdevent* fde, unsigned events) { 179 auto it = g_poll_node_map.find(fde->fd); 180 CHECK(it != g_poll_node_map.end()); 181 PollNode& node = it->second; 182 if (events & FDE_READ) { 183 node.pollfd.events |= POLLIN; 184 } else { 185 node.pollfd.events &= ~POLLIN; 186 } 187 188 if (events & FDE_WRITE) { 189 node.pollfd.events |= POLLOUT; 190 } else { 191 node.pollfd.events &= ~POLLOUT; 192 } 193 fde->state = (fde->state & FDE_STATEMASK) | events; 194 } 195 196 void fdevent_set(fdevent* fde, unsigned events) { 197 check_main_thread(); 198 events &= FDE_EVENTMASK; 199 if ((fde->state & FDE_EVENTMASK) == events) { 200 return; 201 } 202 CHECK(fde->state & FDE_ACTIVE); 203 fdevent_update(fde, events); 204 D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events); 205 206 if (fde->state & FDE_PENDING) { 207 // If we are pending, make sure we don't signal an event that is no longer wanted. 208 fde->events &= events; 209 if (fde->events == 0) { 210 g_pending_list.remove(fde); 211 fde->state &= ~FDE_PENDING; 212 } 213 } 214 } 215 216 void fdevent_add(fdevent* fde, unsigned events) { 217 check_main_thread(); 218 fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); 219 } 220 221 void fdevent_del(fdevent* fde, unsigned events) { 222 check_main_thread(); 223 fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); 224 } 225 226 static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { 227 std::string result; 228 for (const auto& pollfd : pollfds) { 229 std::string op; 230 if (pollfd.events & POLLIN) { 231 op += "R"; 232 } 233 if (pollfd.events & POLLOUT) { 234 op += "W"; 235 } 236 android::base::StringAppendF(&result, " %d(%s)", pollfd.fd, op.c_str()); 237 } 238 return result; 239 } 240 241 static void fdevent_process() { 242 std::vector<adb_pollfd> pollfds; 243 for (const auto& pair : g_poll_node_map) { 244 pollfds.push_back(pair.second.pollfd); 245 } 246 CHECK_GT(pollfds.size(), 0u); 247 D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); 248 int ret = adb_poll(&pollfds[0], pollfds.size(), -1); 249 if (ret == -1) { 250 PLOG(ERROR) << "poll(), ret = " << ret; 251 return; 252 } 253 for (const auto& pollfd : pollfds) { 254 if (pollfd.revents != 0) { 255 D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); 256 } 257 unsigned events = 0; 258 if (pollfd.revents & POLLIN) { 259 events |= FDE_READ; 260 } 261 if (pollfd.revents & POLLOUT) { 262 events |= FDE_WRITE; 263 } 264 if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { 265 // We fake a read, as the rest of the code assumes that errors will 266 // be detected at that point. 267 events |= FDE_READ | FDE_ERROR; 268 } 269 #if defined(__linux__) 270 if (pollfd.revents & POLLRDHUP) { 271 events |= FDE_READ | FDE_ERROR; 272 } 273 #endif 274 if (events != 0) { 275 auto it = g_poll_node_map.find(pollfd.fd); 276 CHECK(it != g_poll_node_map.end()); 277 fdevent* fde = it->second.fde; 278 CHECK_EQ(fde->fd, pollfd.fd); 279 fde->events |= events; 280 D("%s got events %x", dump_fde(fde).c_str(), events); 281 fde->state |= FDE_PENDING; 282 g_pending_list.push_back(fde); 283 } 284 } 285 } 286 287 static void fdevent_call_fdfunc(fdevent* fde) { 288 unsigned events = fde->events; 289 fde->events = 0; 290 CHECK(fde->state & FDE_PENDING); 291 fde->state &= (~FDE_PENDING); 292 D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); 293 fde->func(fde->fd, events, fde->arg); 294 } 295 296 #if !ADB_HOST 297 298 #include <sys/ioctl.h> 299 300 static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) { 301 D("subproc handling on fd = %d, ev = %x", fd, ev); 302 303 CHECK_GE(fd, 0); 304 305 if (ev & FDE_READ) { 306 int subproc_fd; 307 308 if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) { 309 LOG(FATAL) << "Failed to read the subproc's fd from " << fd; 310 } 311 auto it = g_poll_node_map.find(subproc_fd); 312 if (it == g_poll_node_map.end()) { 313 D("subproc_fd %d cleared from fd_table", subproc_fd); 314 adb_close(subproc_fd); 315 return; 316 } 317 fdevent* subproc_fde = it->second.fde; 318 if(subproc_fde->fd != subproc_fd) { 319 // Already reallocated? 320 LOG(FATAL) << "subproc_fd(" << subproc_fd << ") != subproc_fde->fd(" << subproc_fde->fd 321 << ")"; 322 return; 323 } 324 325 subproc_fde->force_eof = 1; 326 327 int rcount = 0; 328 ioctl(subproc_fd, FIONREAD, &rcount); 329 D("subproc with fd %d has rcount=%d, err=%d", subproc_fd, rcount, errno); 330 if (rcount != 0) { 331 // If there is data left, it will show up in the select(). 332 // This works because there is no other thread reading that 333 // data when in this fd_func(). 334 return; 335 } 336 337 D("subproc_fde %s", dump_fde(subproc_fde).c_str()); 338 subproc_fde->events |= FDE_READ; 339 if(subproc_fde->state & FDE_PENDING) { 340 return; 341 } 342 subproc_fde->state |= FDE_PENDING; 343 fdevent_call_fdfunc(subproc_fde); 344 } 345 } 346 347 static void fdevent_subproc_setup() { 348 int s[2]; 349 350 if(adb_socketpair(s)) { 351 PLOG(FATAL) << "cannot create shell-exit socket-pair"; 352 } 353 D("fdevent_subproc: socket pair (%d, %d)", s[0], s[1]); 354 355 SHELL_EXIT_NOTIFY_FD = s[0]; 356 fdevent *fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL); 357 CHECK(fde != nullptr) << "cannot create fdevent for shell-exit handler"; 358 fdevent_add(fde, FDE_READ); 359 } 360 #endif // !ADB_HOST 361 362 static void fdevent_run_flush() REQUIRES(run_queue_mutex) { 363 for (auto& f : run_queue) { 364 f(); 365 } 366 run_queue.clear(); 367 } 368 369 static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { 370 CHECK_GE(fd, 0); 371 CHECK(ev & FDE_READ); 372 373 char buf[1024]; 374 375 // Empty the fd. 376 if (adb_read(fd, buf, sizeof(buf)) == -1) { 377 PLOG(FATAL) << "failed to empty run queue notify fd"; 378 } 379 380 std::lock_guard<std::mutex> lock(run_queue_mutex); 381 fdevent_run_flush(); 382 } 383 384 static void fdevent_run_setup() { 385 std::lock_guard<std::mutex> lock(run_queue_mutex); 386 CHECK(run_queue_notify_fd.get() == -1); 387 int s[2]; 388 if (adb_socketpair(s) != 0) { 389 PLOG(FATAL) << "failed to create run queue notify socketpair"; 390 } 391 392 run_queue_notify_fd.reset(s[0]); 393 fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr); 394 CHECK(fde != nullptr); 395 fdevent_add(fde, FDE_READ); 396 397 fdevent_run_flush(); 398 } 399 400 void fdevent_run_on_main_thread(std::function<void()> fn) { 401 std::lock_guard<std::mutex> lock(run_queue_mutex); 402 run_queue.push_back(std::move(fn)); 403 404 // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. 405 // In that case, rely on the setup code to flush the queue without a notification being needed. 406 if (run_queue_notify_fd != -1) { 407 if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) { 408 PLOG(FATAL) << "failed to write to run queue notify fd"; 409 } 410 } 411 } 412 413 void fdevent_loop() { 414 set_main_thread(); 415 #if !ADB_HOST 416 fdevent_subproc_setup(); 417 #endif // !ADB_HOST 418 fdevent_run_setup(); 419 420 while (true) { 421 if (terminate_loop) { 422 return; 423 } 424 425 D("--- --- waiting for events"); 426 427 fdevent_process(); 428 429 while (!g_pending_list.empty()) { 430 fdevent* fde = g_pending_list.front(); 431 g_pending_list.pop_front(); 432 fdevent_call_fdfunc(fde); 433 } 434 } 435 } 436 437 void fdevent_terminate_loop() { 438 terminate_loop = true; 439 } 440 441 size_t fdevent_installed_count() { 442 return g_poll_node_map.size(); 443 } 444 445 void fdevent_reset() { 446 g_poll_node_map.clear(); 447 g_pending_list.clear(); 448 449 std::lock_guard<std::mutex> lock(run_queue_mutex); 450 run_queue_notify_fd.reset(); 451 run_queue.clear(); 452 453 main_thread_valid = false; 454 terminate_loop = false; 455 } 456