1 // 2 // Copyright 2010 The Android Open Source Project 3 // 4 // A looper implementation based on epoll(). 5 // 6 #define LOG_TAG "Looper" 7 8 //#define LOG_NDEBUG 0 9 10 // Debugs poll and wake interactions. 11 #define DEBUG_POLL_AND_WAKE 0 12 13 // Debugs callback registration and invocation. 14 #define DEBUG_CALLBACKS 0 15 16 #include <cutils/log.h> 17 #include <utils/Looper.h> 18 #include <utils/Timers.h> 19 20 #include <unistd.h> 21 #include <fcntl.h> 22 #include <limits.h> 23 24 25 namespace android { 26 27 // --- WeakMessageHandler --- 28 29 WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) : 30 mHandler(handler) { 31 } 32 33 WeakMessageHandler::~WeakMessageHandler() { 34 } 35 36 void WeakMessageHandler::handleMessage(const Message& message) { 37 sp<MessageHandler> handler = mHandler.promote(); 38 if (handler != NULL) { 39 handler->handleMessage(message); 40 } 41 } 42 43 44 // --- SimpleLooperCallback --- 45 46 SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) : 47 mCallback(callback) { 48 } 49 50 SimpleLooperCallback::~SimpleLooperCallback() { 51 } 52 53 int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { 54 return mCallback(fd, events, data); 55 } 56 57 58 // --- Looper --- 59 60 // Hint for number of file descriptors to be associated with the epoll instance. 61 static const int EPOLL_SIZE_HINT = 8; 62 63 // Maximum number of file descriptors for which to retrieve poll events each iteration. 64 static const int EPOLL_MAX_EVENTS = 16; 65 66 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; 67 static pthread_key_t gTLSKey = 0; 68 69 Looper::Looper(bool allowNonCallbacks) : 70 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), 71 mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { 72 int wakeFds[2]; 73 int result = pipe(wakeFds); 74 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); 75 76 mWakeReadPipeFd = wakeFds[0]; 77 mWakeWritePipeFd = wakeFds[1]; 78 79 result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); 80 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", 81 errno); 82 83 result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); 84 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", 85 errno); 86 87 mIdling = false; 88 89 // Allocate the epoll instance and register the wake pipe. 90 mEpollFd = epoll_create(EPOLL_SIZE_HINT); 91 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); 92 93 struct epoll_event eventItem; 94 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 95 eventItem.events = EPOLLIN; 96 eventItem.data.fd = mWakeReadPipeFd; 97 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 98 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", 99 errno); 100 } 101 102 Looper::~Looper() { 103 close(mWakeReadPipeFd); 104 close(mWakeWritePipeFd); 105 close(mEpollFd); 106 } 107 108 void Looper::initTLSKey() { 109 int result = pthread_key_create(& gTLSKey, threadDestructor); 110 LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key."); 111 } 112 113 void Looper::threadDestructor(void *st) { 114 Looper* const self = static_cast<Looper*>(st); 115 if (self != NULL) { 116 self->decStrong((void*)threadDestructor); 117 } 118 } 119 120 void Looper::setForThread(const sp<Looper>& looper) { 121 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS 122 123 if (looper != NULL) { 124 looper->incStrong((void*)threadDestructor); 125 } 126 127 pthread_setspecific(gTLSKey, looper.get()); 128 129 if (old != NULL) { 130 old->decStrong((void*)threadDestructor); 131 } 132 } 133 134 sp<Looper> Looper::getForThread() { 135 int result = pthread_once(& gTLSOnce, initTLSKey); 136 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); 137 138 return (Looper*)pthread_getspecific(gTLSKey); 139 } 140 141 sp<Looper> Looper::prepare(int opts) { 142 bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS; 143 sp<Looper> looper = Looper::getForThread(); 144 if (looper == NULL) { 145 looper = new Looper(allowNonCallbacks); 146 Looper::setForThread(looper); 147 } 148 if (looper->getAllowNonCallbacks() != allowNonCallbacks) { 149 ALOGW("Looper already prepared for this thread with a different value for the " 150 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option."); 151 } 152 return looper; 153 } 154 155 bool Looper::getAllowNonCallbacks() const { 156 return mAllowNonCallbacks; 157 } 158 159 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 160 int result = 0; 161 for (;;) { 162 while (mResponseIndex < mResponses.size()) { 163 const Response& response = mResponses.itemAt(mResponseIndex++); 164 int ident = response.request.ident; 165 if (ident >= 0) { 166 int fd = response.request.fd; 167 int events = response.events; 168 void* data = response.request.data; 169 #if DEBUG_POLL_AND_WAKE 170 ALOGD("%p ~ pollOnce - returning signalled identifier %d: " 171 "fd=%d, events=0x%x, data=%p", 172 this, ident, fd, events, data); 173 #endif 174 if (outFd != NULL) *outFd = fd; 175 if (outEvents != NULL) *outEvents = events; 176 if (outData != NULL) *outData = data; 177 return ident; 178 } 179 } 180 181 if (result != 0) { 182 #if DEBUG_POLL_AND_WAKE 183 ALOGD("%p ~ pollOnce - returning result %d", this, result); 184 #endif 185 if (outFd != NULL) *outFd = 0; 186 if (outEvents != NULL) *outEvents = 0; 187 if (outData != NULL) *outData = NULL; 188 return result; 189 } 190 191 result = pollInner(timeoutMillis); 192 } 193 } 194 195 int Looper::pollInner(int timeoutMillis) { 196 #if DEBUG_POLL_AND_WAKE 197 ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); 198 #endif 199 200 // Adjust the timeout based on when the next message is due. 201 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { 202 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 203 int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); 204 if (messageTimeoutMillis >= 0 205 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { 206 timeoutMillis = messageTimeoutMillis; 207 } 208 #if DEBUG_POLL_AND_WAKE 209 ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", 210 this, mNextMessageUptime - now, timeoutMillis); 211 #endif 212 } 213 214 // Poll. 215 int result = ALOOPER_POLL_WAKE; 216 mResponses.clear(); 217 mResponseIndex = 0; 218 219 // We are about to idle. 220 mIdling = true; 221 222 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 223 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 224 225 // No longer idling. 226 mIdling = false; 227 228 // Acquire lock. 229 mLock.lock(); 230 231 // Check for poll error. 232 if (eventCount < 0) { 233 if (errno == EINTR) { 234 goto Done; 235 } 236 ALOGW("Poll failed with an unexpected error, errno=%d", errno); 237 result = ALOOPER_POLL_ERROR; 238 goto Done; 239 } 240 241 // Check for poll timeout. 242 if (eventCount == 0) { 243 #if DEBUG_POLL_AND_WAKE 244 ALOGD("%p ~ pollOnce - timeout", this); 245 #endif 246 result = ALOOPER_POLL_TIMEOUT; 247 goto Done; 248 } 249 250 // Handle all events. 251 #if DEBUG_POLL_AND_WAKE 252 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); 253 #endif 254 255 for (int i = 0; i < eventCount; i++) { 256 int fd = eventItems[i].data.fd; 257 uint32_t epollEvents = eventItems[i].events; 258 if (fd == mWakeReadPipeFd) { 259 if (epollEvents & EPOLLIN) { 260 awoken(); 261 } else { 262 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); 263 } 264 } else { 265 ssize_t requestIndex = mRequests.indexOfKey(fd); 266 if (requestIndex >= 0) { 267 int events = 0; 268 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; 269 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; 270 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; 271 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; 272 pushResponse(events, mRequests.valueAt(requestIndex)); 273 } else { 274 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " 275 "no longer registered.", epollEvents, fd); 276 } 277 } 278 } 279 Done: ; 280 281 // Invoke pending message callbacks. 282 mNextMessageUptime = LLONG_MAX; 283 while (mMessageEnvelopes.size() != 0) { 284 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 285 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); 286 if (messageEnvelope.uptime <= now) { 287 // Remove the envelope from the list. 288 // We keep a strong reference to the handler until the call to handleMessage 289 // finishes. Then we drop it so that the handler can be deleted *before* 290 // we reacquire our lock. 291 { // obtain handler 292 sp<MessageHandler> handler = messageEnvelope.handler; 293 Message message = messageEnvelope.message; 294 mMessageEnvelopes.removeAt(0); 295 mSendingMessage = true; 296 mLock.unlock(); 297 298 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 299 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", 300 this, handler.get(), message.what); 301 #endif 302 handler->handleMessage(message); 303 } // release handler 304 305 mLock.lock(); 306 mSendingMessage = false; 307 result = ALOOPER_POLL_CALLBACK; 308 } else { 309 // The last message left at the head of the queue determines the next wakeup time. 310 mNextMessageUptime = messageEnvelope.uptime; 311 break; 312 } 313 } 314 315 // Release lock. 316 mLock.unlock(); 317 318 // Invoke all response callbacks. 319 for (size_t i = 0; i < mResponses.size(); i++) { 320 Response& response = mResponses.editItemAt(i); 321 if (response.request.ident == ALOOPER_POLL_CALLBACK) { 322 int fd = response.request.fd; 323 int events = response.events; 324 void* data = response.request.data; 325 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 326 ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", 327 this, response.request.callback.get(), fd, events, data); 328 #endif 329 int callbackResult = response.request.callback->handleEvent(fd, events, data); 330 if (callbackResult == 0) { 331 removeFd(fd); 332 } 333 // Clear the callback reference in the response structure promptly because we 334 // will not clear the response vector itself until the next poll. 335 response.request.callback.clear(); 336 result = ALOOPER_POLL_CALLBACK; 337 } 338 } 339 return result; 340 } 341 342 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 343 if (timeoutMillis <= 0) { 344 int result; 345 do { 346 result = pollOnce(timeoutMillis, outFd, outEvents, outData); 347 } while (result == ALOOPER_POLL_CALLBACK); 348 return result; 349 } else { 350 nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC) 351 + milliseconds_to_nanoseconds(timeoutMillis); 352 353 for (;;) { 354 int result = pollOnce(timeoutMillis, outFd, outEvents, outData); 355 if (result != ALOOPER_POLL_CALLBACK) { 356 return result; 357 } 358 359 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 360 timeoutMillis = toMillisecondTimeoutDelay(now, endTime); 361 if (timeoutMillis == 0) { 362 return ALOOPER_POLL_TIMEOUT; 363 } 364 } 365 } 366 } 367 368 void Looper::wake() { 369 #if DEBUG_POLL_AND_WAKE 370 ALOGD("%p ~ wake", this); 371 #endif 372 373 ssize_t nWrite; 374 do { 375 nWrite = write(mWakeWritePipeFd, "W", 1); 376 } while (nWrite == -1 && errno == EINTR); 377 378 if (nWrite != 1) { 379 if (errno != EAGAIN) { 380 ALOGW("Could not write wake signal, errno=%d", errno); 381 } 382 } 383 } 384 385 void Looper::awoken() { 386 #if DEBUG_POLL_AND_WAKE 387 ALOGD("%p ~ awoken", this); 388 #endif 389 390 char buffer[16]; 391 ssize_t nRead; 392 do { 393 nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); 394 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); 395 } 396 397 void Looper::pushResponse(int events, const Request& request) { 398 Response response; 399 response.events = events; 400 response.request = request; 401 mResponses.push(response); 402 } 403 404 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { 405 return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); 406 } 407 408 int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { 409 #if DEBUG_CALLBACKS 410 ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, 411 events, callback.get(), data); 412 #endif 413 414 if (!callback.get()) { 415 if (! mAllowNonCallbacks) { 416 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); 417 return -1; 418 } 419 420 if (ident < 0) { 421 ALOGE("Invalid attempt to set NULL callback with ident < 0."); 422 return -1; 423 } 424 } else { 425 ident = ALOOPER_POLL_CALLBACK; 426 } 427 428 int epollEvents = 0; 429 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; 430 if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; 431 432 { // acquire lock 433 AutoMutex _l(mLock); 434 435 Request request; 436 request.fd = fd; 437 request.ident = ident; 438 request.callback = callback; 439 request.data = data; 440 441 struct epoll_event eventItem; 442 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 443 eventItem.events = epollEvents; 444 eventItem.data.fd = fd; 445 446 ssize_t requestIndex = mRequests.indexOfKey(fd); 447 if (requestIndex < 0) { 448 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); 449 if (epollResult < 0) { 450 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); 451 return -1; 452 } 453 mRequests.add(fd, request); 454 } else { 455 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); 456 if (epollResult < 0) { 457 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); 458 return -1; 459 } 460 mRequests.replaceValueAt(requestIndex, request); 461 } 462 } // release lock 463 return 1; 464 } 465 466 int Looper::removeFd(int fd) { 467 #if DEBUG_CALLBACKS 468 ALOGD("%p ~ removeFd - fd=%d", this, fd); 469 #endif 470 471 { // acquire lock 472 AutoMutex _l(mLock); 473 ssize_t requestIndex = mRequests.indexOfKey(fd); 474 if (requestIndex < 0) { 475 return 0; 476 } 477 478 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL); 479 if (epollResult < 0) { 480 ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno); 481 return -1; 482 } 483 484 mRequests.removeItemsAt(requestIndex); 485 } // release lock 486 return 1; 487 } 488 489 void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) { 490 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 491 sendMessageAtTime(now, handler, message); 492 } 493 494 void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler, 495 const Message& message) { 496 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 497 sendMessageAtTime(now + uptimeDelay, handler, message); 498 } 499 500 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, 501 const Message& message) { 502 #if DEBUG_CALLBACKS 503 ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", 504 this, uptime, handler.get(), message.what); 505 #endif 506 507 size_t i = 0; 508 { // acquire lock 509 AutoMutex _l(mLock); 510 511 size_t messageCount = mMessageEnvelopes.size(); 512 while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { 513 i += 1; 514 } 515 516 MessageEnvelope messageEnvelope(uptime, handler, message); 517 mMessageEnvelopes.insertAt(messageEnvelope, i, 1); 518 519 // Optimization: If the Looper is currently sending a message, then we can skip 520 // the call to wake() because the next thing the Looper will do after processing 521 // messages is to decide when the next wakeup time should be. In fact, it does 522 // not even matter whether this code is running on the Looper thread. 523 if (mSendingMessage) { 524 return; 525 } 526 } // release lock 527 528 // Wake the poll loop only when we enqueue a new message at the head. 529 if (i == 0) { 530 wake(); 531 } 532 } 533 534 void Looper::removeMessages(const sp<MessageHandler>& handler) { 535 #if DEBUG_CALLBACKS 536 ALOGD("%p ~ removeMessages - handler=%p", this, handler.get()); 537 #endif 538 539 { // acquire lock 540 AutoMutex _l(mLock); 541 542 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 543 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 544 if (messageEnvelope.handler == handler) { 545 mMessageEnvelopes.removeAt(i); 546 } 547 } 548 } // release lock 549 } 550 551 void Looper::removeMessages(const sp<MessageHandler>& handler, int what) { 552 #if DEBUG_CALLBACKS 553 ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what); 554 #endif 555 556 { // acquire lock 557 AutoMutex _l(mLock); 558 559 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 560 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 561 if (messageEnvelope.handler == handler 562 && messageEnvelope.message.what == what) { 563 mMessageEnvelopes.removeAt(i); 564 } 565 } 566 } // release lock 567 } 568 569 bool Looper::isIdling() const { 570 return mIdling; 571 } 572 573 } // namespace android 574