1 // 2 // Copyright 2010 The Android Open Source Project 3 // 4 // A looper implementation based on epoll(). 5 // 6 #define LOG_TAG "Looper" 7 8 //#define LOG_NDEBUG 0 9 10 // Debugs poll and wake interactions. 11 #define DEBUG_POLL_AND_WAKE 0 12 13 // Debugs callback registration and invocation. 14 #define DEBUG_CALLBACKS 0 15 16 #include <cutils/log.h> 17 #include <utils/Looper.h> 18 #include <utils/Timers.h> 19 20 #include <unistd.h> 21 #include <fcntl.h> 22 23 24 namespace android { 25 26 #ifdef LOOPER_USES_EPOLL 27 // Hint for number of file descriptors to be associated with the epoll instance. 28 static const int EPOLL_SIZE_HINT = 8; 29 30 // Maximum number of file descriptors for which to retrieve poll events each iteration. 31 static const int EPOLL_MAX_EVENTS = 16; 32 #endif 33 34 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; 35 static pthread_key_t gTLSKey = 0; 36 37 Looper::Looper(bool allowNonCallbacks) : 38 mAllowNonCallbacks(allowNonCallbacks), 39 mResponseIndex(0) { 40 int wakeFds[2]; 41 int result = pipe(wakeFds); 42 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); 43 44 mWakeReadPipeFd = wakeFds[0]; 45 mWakeWritePipeFd = wakeFds[1]; 46 47 result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); 48 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", 49 errno); 50 51 result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); 52 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", 53 errno); 54 55 #ifdef LOOPER_USES_EPOLL 56 // Allocate the epoll instance and register the wake pipe. 57 mEpollFd = epoll_create(EPOLL_SIZE_HINT); 58 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); 59 60 struct epoll_event eventItem; 61 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 62 eventItem.events = EPOLLIN; 63 eventItem.data.fd = mWakeReadPipeFd; 64 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 65 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", 66 errno); 67 #else 68 // Add the wake pipe to the head of the request list with a null callback. 69 struct pollfd requestedFd; 70 requestedFd.fd = mWakeReadPipeFd; 71 requestedFd.events = POLLIN; 72 mRequestedFds.push(requestedFd); 73 74 Request request; 75 request.fd = mWakeReadPipeFd; 76 request.callback = NULL; 77 request.ident = 0; 78 request.data = NULL; 79 mRequests.push(request); 80 81 mPolling = false; 82 mWaiters = 0; 83 #endif 84 85 #ifdef LOOPER_STATISTICS 86 mPendingWakeTime = -1; 87 mPendingWakeCount = 0; 88 mSampledWakeCycles = 0; 89 mSampledWakeCountSum = 0; 90 mSampledWakeLatencySum = 0; 91 92 mSampledPolls = 0; 93 mSampledZeroPollCount = 0; 94 mSampledZeroPollLatencySum = 0; 95 mSampledTimeoutPollCount = 0; 96 mSampledTimeoutPollLatencySum = 0; 97 #endif 98 } 99 100 Looper::~Looper() { 101 close(mWakeReadPipeFd); 102 close(mWakeWritePipeFd); 103 #ifdef LOOPER_USES_EPOLL 104 close(mEpollFd); 105 #endif 106 } 107 108 void Looper::initTLSKey() { 109 int result = pthread_key_create(& gTLSKey, threadDestructor); 110 LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key."); 111 } 112 113 void Looper::threadDestructor(void *st) { 114 Looper* const self = static_cast<Looper*>(st); 115 if (self != NULL) { 116 self->decStrong((void*)threadDestructor); 117 } 118 } 119 120 void Looper::setForThread(const sp<Looper>& looper) { 121 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS 122 123 if (looper != NULL) { 124 looper->incStrong((void*)threadDestructor); 125 } 126 127 pthread_setspecific(gTLSKey, looper.get()); 128 129 if (old != NULL) { 130 old->decStrong((void*)threadDestructor); 131 } 132 } 133 134 sp<Looper> Looper::getForThread() { 135 int result = pthread_once(& gTLSOnce, initTLSKey); 136 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); 137 138 return (Looper*)pthread_getspecific(gTLSKey); 139 } 140 141 sp<Looper> Looper::prepare(int opts) { 142 bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS; 143 sp<Looper> looper = Looper::getForThread(); 144 if (looper == NULL) { 145 looper = new Looper(allowNonCallbacks); 146 Looper::setForThread(looper); 147 } 148 if (looper->getAllowNonCallbacks() != allowNonCallbacks) { 149 LOGW("Looper already prepared for this thread with a different value for the " 150 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option."); 151 } 152 return looper; 153 } 154 155 bool Looper::getAllowNonCallbacks() const { 156 return mAllowNonCallbacks; 157 } 158 159 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 160 int result = 0; 161 for (;;) { 162 while (mResponseIndex < mResponses.size()) { 163 const Response& response = mResponses.itemAt(mResponseIndex++); 164 if (! response.request.callback) { 165 #if DEBUG_POLL_AND_WAKE 166 LOGD("%p ~ pollOnce - returning signalled identifier %d: " 167 "fd=%d, events=0x%x, data=%p", this, 168 response.request.ident, response.request.fd, 169 response.events, response.request.data); 170 #endif 171 if (outFd != NULL) *outFd = response.request.fd; 172 if (outEvents != NULL) *outEvents = response.events; 173 if (outData != NULL) *outData = response.request.data; 174 return response.request.ident; 175 } 176 } 177 178 if (result != 0) { 179 #if DEBUG_POLL_AND_WAKE 180 LOGD("%p ~ pollOnce - returning result %d", this, result); 181 #endif 182 if (outFd != NULL) *outFd = 0; 183 if (outEvents != NULL) *outEvents = NULL; 184 if (outData != NULL) *outData = NULL; 185 return result; 186 } 187 188 result = pollInner(timeoutMillis); 189 } 190 } 191 192 int Looper::pollInner(int timeoutMillis) { 193 #if DEBUG_POLL_AND_WAKE 194 LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); 195 #endif 196 197 int result = ALOOPER_POLL_WAKE; 198 mResponses.clear(); 199 mResponseIndex = 0; 200 201 #ifdef LOOPER_STATISTICS 202 nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC); 203 #endif 204 205 #ifdef LOOPER_USES_EPOLL 206 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 207 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 208 bool acquiredLock = false; 209 #else 210 // Wait for wakeAndLock() waiters to run then set mPolling to true. 211 mLock.lock(); 212 while (mWaiters != 0) { 213 mResume.wait(mLock); 214 } 215 mPolling = true; 216 mLock.unlock(); 217 218 size_t requestedCount = mRequestedFds.size(); 219 int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis); 220 #endif 221 222 if (eventCount < 0) { 223 if (errno == EINTR) { 224 goto Done; 225 } 226 227 LOGW("Poll failed with an unexpected error, errno=%d", errno); 228 result = ALOOPER_POLL_ERROR; 229 goto Done; 230 } 231 232 if (eventCount == 0) { 233 #if DEBUG_POLL_AND_WAKE 234 LOGD("%p ~ pollOnce - timeout", this); 235 #endif 236 result = ALOOPER_POLL_TIMEOUT; 237 goto Done; 238 } 239 240 #if DEBUG_POLL_AND_WAKE 241 LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); 242 #endif 243 244 #ifdef LOOPER_USES_EPOLL 245 for (int i = 0; i < eventCount; i++) { 246 int fd = eventItems[i].data.fd; 247 uint32_t epollEvents = eventItems[i].events; 248 if (fd == mWakeReadPipeFd) { 249 if (epollEvents & EPOLLIN) { 250 awoken(); 251 } else { 252 LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); 253 } 254 } else { 255 if (! acquiredLock) { 256 mLock.lock(); 257 acquiredLock = true; 258 } 259 260 ssize_t requestIndex = mRequests.indexOfKey(fd); 261 if (requestIndex >= 0) { 262 int events = 0; 263 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; 264 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; 265 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; 266 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; 267 pushResponse(events, mRequests.valueAt(requestIndex)); 268 } else { 269 LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " 270 "no longer registered.", epollEvents, fd); 271 } 272 } 273 } 274 if (acquiredLock) { 275 mLock.unlock(); 276 } 277 Done: ; 278 #else 279 for (size_t i = 0; i < requestedCount; i++) { 280 const struct pollfd& requestedFd = mRequestedFds.itemAt(i); 281 282 short pollEvents = requestedFd.revents; 283 if (pollEvents) { 284 if (requestedFd.fd == mWakeReadPipeFd) { 285 if (pollEvents & POLLIN) { 286 awoken(); 287 } else { 288 LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents); 289 } 290 } else { 291 int events = 0; 292 if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT; 293 if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT; 294 if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR; 295 if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP; 296 if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID; 297 pushResponse(events, mRequests.itemAt(i)); 298 } 299 if (--eventCount == 0) { 300 break; 301 } 302 } 303 } 304 305 Done: 306 // Set mPolling to false and wake up the wakeAndLock() waiters. 307 mLock.lock(); 308 mPolling = false; 309 if (mWaiters != 0) { 310 mAwake.broadcast(); 311 } 312 mLock.unlock(); 313 #endif 314 315 #ifdef LOOPER_STATISTICS 316 nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC); 317 mSampledPolls += 1; 318 if (timeoutMillis == 0) { 319 mSampledZeroPollCount += 1; 320 mSampledZeroPollLatencySum += pollEndTime - pollStartTime; 321 } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) { 322 mSampledTimeoutPollCount += 1; 323 mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime 324 - milliseconds_to_nanoseconds(timeoutMillis); 325 } 326 if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) { 327 LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this, 328 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount, 329 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount); 330 mSampledPolls = 0; 331 mSampledZeroPollCount = 0; 332 mSampledZeroPollLatencySum = 0; 333 mSampledTimeoutPollCount = 0; 334 mSampledTimeoutPollLatencySum = 0; 335 } 336 #endif 337 338 for (size_t i = 0; i < mResponses.size(); i++) { 339 const Response& response = mResponses.itemAt(i); 340 if (response.request.callback) { 341 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 342 LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this, 343 response.request.fd, response.events, response.request.data); 344 #endif 345 int callbackResult = response.request.callback( 346 response.request.fd, response.events, response.request.data); 347 if (callbackResult == 0) { 348 removeFd(response.request.fd); 349 } 350 351 result = ALOOPER_POLL_CALLBACK; 352 } 353 } 354 return result; 355 } 356 357 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 358 if (timeoutMillis <= 0) { 359 int result; 360 do { 361 result = pollOnce(timeoutMillis, outFd, outEvents, outData); 362 } while (result == ALOOPER_POLL_CALLBACK); 363 return result; 364 } else { 365 nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC) 366 + milliseconds_to_nanoseconds(timeoutMillis); 367 368 for (;;) { 369 int result = pollOnce(timeoutMillis, outFd, outEvents, outData); 370 if (result != ALOOPER_POLL_CALLBACK) { 371 return result; 372 } 373 374 nsecs_t timeoutNanos = endTime - systemTime(SYSTEM_TIME_MONOTONIC); 375 if (timeoutNanos <= 0) { 376 return ALOOPER_POLL_TIMEOUT; 377 } 378 379 timeoutMillis = int(nanoseconds_to_milliseconds(timeoutNanos + 999999LL)); 380 } 381 } 382 } 383 384 void Looper::wake() { 385 #if DEBUG_POLL_AND_WAKE 386 LOGD("%p ~ wake", this); 387 #endif 388 389 #ifdef LOOPER_STATISTICS 390 // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled. 391 if (mPendingWakeCount++ == 0) { 392 mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC); 393 } 394 #endif 395 396 ssize_t nWrite; 397 do { 398 nWrite = write(mWakeWritePipeFd, "W", 1); 399 } while (nWrite == -1 && errno == EINTR); 400 401 if (nWrite != 1) { 402 if (errno != EAGAIN) { 403 LOGW("Could not write wake signal, errno=%d", errno); 404 } 405 } 406 } 407 408 void Looper::awoken() { 409 #if DEBUG_POLL_AND_WAKE 410 LOGD("%p ~ awoken", this); 411 #endif 412 413 #ifdef LOOPER_STATISTICS 414 if (mPendingWakeCount == 0) { 415 LOGD("%p ~ awoken: spurious!", this); 416 } else { 417 mSampledWakeCycles += 1; 418 mSampledWakeCountSum += mPendingWakeCount; 419 mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime; 420 mPendingWakeCount = 0; 421 mPendingWakeTime = -1; 422 if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) { 423 LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this, 424 0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles, 425 float(mSampledWakeCountSum) / mSampledWakeCycles); 426 mSampledWakeCycles = 0; 427 mSampledWakeCountSum = 0; 428 mSampledWakeLatencySum = 0; 429 } 430 } 431 #endif 432 433 char buffer[16]; 434 ssize_t nRead; 435 do { 436 nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); 437 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); 438 } 439 440 void Looper::pushResponse(int events, const Request& request) { 441 Response response; 442 response.events = events; 443 response.request = request; 444 mResponses.push(response); 445 } 446 447 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { 448 #if DEBUG_CALLBACKS 449 LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, 450 events, callback, data); 451 #endif 452 453 if (! callback) { 454 if (! mAllowNonCallbacks) { 455 LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); 456 return -1; 457 } 458 459 if (ident < 0) { 460 LOGE("Invalid attempt to set NULL callback with ident <= 0."); 461 return -1; 462 } 463 } 464 465 #ifdef LOOPER_USES_EPOLL 466 int epollEvents = 0; 467 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; 468 if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; 469 470 { // acquire lock 471 AutoMutex _l(mLock); 472 473 Request request; 474 request.fd = fd; 475 request.ident = ident; 476 request.callback = callback; 477 request.data = data; 478 479 struct epoll_event eventItem; 480 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 481 eventItem.events = epollEvents; 482 eventItem.data.fd = fd; 483 484 ssize_t requestIndex = mRequests.indexOfKey(fd); 485 if (requestIndex < 0) { 486 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); 487 if (epollResult < 0) { 488 LOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); 489 return -1; 490 } 491 mRequests.add(fd, request); 492 } else { 493 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); 494 if (epollResult < 0) { 495 LOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); 496 return -1; 497 } 498 mRequests.replaceValueAt(requestIndex, request); 499 } 500 } // release lock 501 #else 502 int pollEvents = 0; 503 if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN; 504 if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT; 505 506 wakeAndLock(); // acquire lock 507 508 struct pollfd requestedFd; 509 requestedFd.fd = fd; 510 requestedFd.events = pollEvents; 511 512 Request request; 513 request.fd = fd; 514 request.ident = ident; 515 request.callback = callback; 516 request.data = data; 517 ssize_t index = getRequestIndexLocked(fd); 518 if (index < 0) { 519 mRequestedFds.push(requestedFd); 520 mRequests.push(request); 521 } else { 522 mRequestedFds.replaceAt(requestedFd, size_t(index)); 523 mRequests.replaceAt(request, size_t(index)); 524 } 525 526 mLock.unlock(); // release lock 527 #endif 528 return 1; 529 } 530 531 int Looper::removeFd(int fd) { 532 #if DEBUG_CALLBACKS 533 LOGD("%p ~ removeFd - fd=%d", this, fd); 534 #endif 535 536 #ifdef LOOPER_USES_EPOLL 537 { // acquire lock 538 AutoMutex _l(mLock); 539 ssize_t requestIndex = mRequests.indexOfKey(fd); 540 if (requestIndex < 0) { 541 return 0; 542 } 543 544 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL); 545 if (epollResult < 0) { 546 LOGE("Error removing epoll events for fd %d, errno=%d", fd, errno); 547 return -1; 548 } 549 550 mRequests.removeItemsAt(requestIndex); 551 } // release lock 552 return 1; 553 #else 554 wakeAndLock(); // acquire lock 555 556 ssize_t index = getRequestIndexLocked(fd); 557 if (index >= 0) { 558 mRequestedFds.removeAt(size_t(index)); 559 mRequests.removeAt(size_t(index)); 560 } 561 562 mLock.unlock(); // release lock 563 return index >= 0; 564 #endif 565 } 566 567 #ifndef LOOPER_USES_EPOLL 568 ssize_t Looper::getRequestIndexLocked(int fd) { 569 size_t requestCount = mRequestedFds.size(); 570 571 for (size_t i = 0; i < requestCount; i++) { 572 if (mRequestedFds.itemAt(i).fd == fd) { 573 return i; 574 } 575 } 576 577 return -1; 578 } 579 580 void Looper::wakeAndLock() { 581 mLock.lock(); 582 583 mWaiters += 1; 584 while (mPolling) { 585 wake(); 586 mAwake.wait(mLock); 587 } 588 589 mWaiters -= 1; 590 if (mWaiters == 0) { 591 mResume.signal(); 592 } 593 } 594 #endif 595 596 } // namespace android 597