1 // 2 // Copyright 2010 The Android Open Source Project 3 // 4 // A looper implementation based on epoll(). 5 // 6 #define LOG_TAG "Looper" 7 8 //#define LOG_NDEBUG 0 9 10 // Debugs poll and wake interactions. 11 #define DEBUG_POLL_AND_WAKE 0 12 13 // Debugs callback registration and invocation. 14 #define DEBUG_CALLBACKS 0 15 16 #include <cutils/log.h> 17 #include <utils/Looper.h> 18 #include <utils/Timers.h> 19 20 #include <unistd.h> 21 #include <fcntl.h> 22 #include <limits.h> 23 24 25 namespace android { 26 27 // --- WeakMessageHandler --- 28 29 WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) : 30 mHandler(handler) { 31 } 32 33 WeakMessageHandler::~WeakMessageHandler() { 34 } 35 36 void WeakMessageHandler::handleMessage(const Message& message) { 37 sp<MessageHandler> handler = mHandler.promote(); 38 if (handler != NULL) { 39 handler->handleMessage(message); 40 } 41 } 42 43 44 // --- SimpleLooperCallback --- 45 46 SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) : 47 mCallback(callback) { 48 } 49 50 SimpleLooperCallback::~SimpleLooperCallback() { 51 } 52 53 int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { 54 return mCallback(fd, events, data); 55 } 56 57 58 // --- Looper --- 59 60 // Hint for number of file descriptors to be associated with the epoll instance. 61 static const int EPOLL_SIZE_HINT = 8; 62 63 // Maximum number of file descriptors for which to retrieve poll events each iteration. 64 static const int EPOLL_MAX_EVENTS = 16; 65 66 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; 67 static pthread_key_t gTLSKey = 0; 68 69 Looper::Looper(bool allowNonCallbacks) : 70 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), 71 mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { 72 int wakeFds[2]; 73 int result = pipe(wakeFds); 74 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); 75 76 mWakeReadPipeFd = wakeFds[0]; 77 mWakeWritePipeFd = wakeFds[1]; 78 79 result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); 80 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", 81 errno); 82 83 result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); 84 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", 85 errno); 86 87 // Allocate the epoll instance and register the wake pipe. 88 mEpollFd = epoll_create(EPOLL_SIZE_HINT); 89 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); 90 91 struct epoll_event eventItem; 92 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 93 eventItem.events = EPOLLIN; 94 eventItem.data.fd = mWakeReadPipeFd; 95 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 96 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", 97 errno); 98 } 99 100 Looper::~Looper() { 101 close(mWakeReadPipeFd); 102 close(mWakeWritePipeFd); 103 close(mEpollFd); 104 } 105 106 void Looper::initTLSKey() { 107 int result = pthread_key_create(& gTLSKey, threadDestructor); 108 LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key."); 109 } 110 111 void Looper::threadDestructor(void *st) { 112 Looper* const self = static_cast<Looper*>(st); 113 if (self != NULL) { 114 self->decStrong((void*)threadDestructor); 115 } 116 } 117 118 void Looper::setForThread(const sp<Looper>& looper) { 119 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS 120 121 if (looper != NULL) { 122 looper->incStrong((void*)threadDestructor); 123 } 124 125 pthread_setspecific(gTLSKey, looper.get()); 126 127 if (old != NULL) { 128 old->decStrong((void*)threadDestructor); 129 } 130 } 131 132 sp<Looper> Looper::getForThread() { 133 int result = pthread_once(& gTLSOnce, initTLSKey); 134 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); 135 136 return (Looper*)pthread_getspecific(gTLSKey); 137 } 138 139 sp<Looper> Looper::prepare(int opts) { 140 bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS; 141 sp<Looper> looper = Looper::getForThread(); 142 if (looper == NULL) { 143 looper = new Looper(allowNonCallbacks); 144 Looper::setForThread(looper); 145 } 146 if (looper->getAllowNonCallbacks() != allowNonCallbacks) { 147 ALOGW("Looper already prepared for this thread with a different value for the " 148 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option."); 149 } 150 return looper; 151 } 152 153 bool Looper::getAllowNonCallbacks() const { 154 return mAllowNonCallbacks; 155 } 156 157 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 158 int result = 0; 159 for (;;) { 160 while (mResponseIndex < mResponses.size()) { 161 const Response& response = mResponses.itemAt(mResponseIndex++); 162 int ident = response.request.ident; 163 if (ident >= 0) { 164 int fd = response.request.fd; 165 int events = response.events; 166 void* data = response.request.data; 167 #if DEBUG_POLL_AND_WAKE 168 ALOGD("%p ~ pollOnce - returning signalled identifier %d: " 169 "fd=%d, events=0x%x, data=%p", 170 this, ident, fd, events, data); 171 #endif 172 if (outFd != NULL) *outFd = fd; 173 if (outEvents != NULL) *outEvents = events; 174 if (outData != NULL) *outData = data; 175 return ident; 176 } 177 } 178 179 if (result != 0) { 180 #if DEBUG_POLL_AND_WAKE 181 ALOGD("%p ~ pollOnce - returning result %d", this, result); 182 #endif 183 if (outFd != NULL) *outFd = 0; 184 if (outEvents != NULL) *outEvents = 0; 185 if (outData != NULL) *outData = NULL; 186 return result; 187 } 188 189 result = pollInner(timeoutMillis); 190 } 191 } 192 193 int Looper::pollInner(int timeoutMillis) { 194 #if DEBUG_POLL_AND_WAKE 195 ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); 196 #endif 197 198 // Adjust the timeout based on when the next message is due. 199 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { 200 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 201 int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); 202 if (messageTimeoutMillis >= 0 203 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { 204 timeoutMillis = messageTimeoutMillis; 205 } 206 #if DEBUG_POLL_AND_WAKE 207 ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", 208 this, mNextMessageUptime - now, timeoutMillis); 209 #endif 210 } 211 212 // Poll. 213 int result = ALOOPER_POLL_WAKE; 214 mResponses.clear(); 215 mResponseIndex = 0; 216 217 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 218 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 219 220 // Acquire lock. 221 mLock.lock(); 222 223 // Check for poll error. 224 if (eventCount < 0) { 225 if (errno == EINTR) { 226 goto Done; 227 } 228 ALOGW("Poll failed with an unexpected error, errno=%d", errno); 229 result = ALOOPER_POLL_ERROR; 230 goto Done; 231 } 232 233 // Check for poll timeout. 234 if (eventCount == 0) { 235 #if DEBUG_POLL_AND_WAKE 236 ALOGD("%p ~ pollOnce - timeout", this); 237 #endif 238 result = ALOOPER_POLL_TIMEOUT; 239 goto Done; 240 } 241 242 // Handle all events. 243 #if DEBUG_POLL_AND_WAKE 244 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); 245 #endif 246 247 for (int i = 0; i < eventCount; i++) { 248 int fd = eventItems[i].data.fd; 249 uint32_t epollEvents = eventItems[i].events; 250 if (fd == mWakeReadPipeFd) { 251 if (epollEvents & EPOLLIN) { 252 awoken(); 253 } else { 254 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); 255 } 256 } else { 257 ssize_t requestIndex = mRequests.indexOfKey(fd); 258 if (requestIndex >= 0) { 259 int events = 0; 260 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; 261 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; 262 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; 263 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; 264 pushResponse(events, mRequests.valueAt(requestIndex)); 265 } else { 266 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " 267 "no longer registered.", epollEvents, fd); 268 } 269 } 270 } 271 Done: ; 272 273 // Invoke pending message callbacks. 274 mNextMessageUptime = LLONG_MAX; 275 while (mMessageEnvelopes.size() != 0) { 276 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 277 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); 278 if (messageEnvelope.uptime <= now) { 279 // Remove the envelope from the list. 280 // We keep a strong reference to the handler until the call to handleMessage 281 // finishes. Then we drop it so that the handler can be deleted *before* 282 // we reacquire our lock. 283 { // obtain handler 284 sp<MessageHandler> handler = messageEnvelope.handler; 285 Message message = messageEnvelope.message; 286 mMessageEnvelopes.removeAt(0); 287 mSendingMessage = true; 288 mLock.unlock(); 289 290 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 291 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", 292 this, handler.get(), message.what); 293 #endif 294 handler->handleMessage(message); 295 } // release handler 296 297 mLock.lock(); 298 mSendingMessage = false; 299 result = ALOOPER_POLL_CALLBACK; 300 } else { 301 // The last message left at the head of the queue determines the next wakeup time. 302 mNextMessageUptime = messageEnvelope.uptime; 303 break; 304 } 305 } 306 307 // Release lock. 308 mLock.unlock(); 309 310 // Invoke all response callbacks. 311 for (size_t i = 0; i < mResponses.size(); i++) { 312 Response& response = mResponses.editItemAt(i); 313 if (response.request.ident == ALOOPER_POLL_CALLBACK) { 314 int fd = response.request.fd; 315 int events = response.events; 316 void* data = response.request.data; 317 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 318 ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", 319 this, response.request.callback.get(), fd, events, data); 320 #endif 321 int callbackResult = response.request.callback->handleEvent(fd, events, data); 322 if (callbackResult == 0) { 323 removeFd(fd); 324 } 325 // Clear the callback reference in the response structure promptly because we 326 // will not clear the response vector itself until the next poll. 327 response.request.callback.clear(); 328 result = ALOOPER_POLL_CALLBACK; 329 } 330 } 331 return result; 332 } 333 334 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 335 if (timeoutMillis <= 0) { 336 int result; 337 do { 338 result = pollOnce(timeoutMillis, outFd, outEvents, outData); 339 } while (result == ALOOPER_POLL_CALLBACK); 340 return result; 341 } else { 342 nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC) 343 + milliseconds_to_nanoseconds(timeoutMillis); 344 345 for (;;) { 346 int result = pollOnce(timeoutMillis, outFd, outEvents, outData); 347 if (result != ALOOPER_POLL_CALLBACK) { 348 return result; 349 } 350 351 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 352 timeoutMillis = toMillisecondTimeoutDelay(now, endTime); 353 if (timeoutMillis == 0) { 354 return ALOOPER_POLL_TIMEOUT; 355 } 356 } 357 } 358 } 359 360 void Looper::wake() { 361 #if DEBUG_POLL_AND_WAKE 362 ALOGD("%p ~ wake", this); 363 #endif 364 365 ssize_t nWrite; 366 do { 367 nWrite = write(mWakeWritePipeFd, "W", 1); 368 } while (nWrite == -1 && errno == EINTR); 369 370 if (nWrite != 1) { 371 if (errno != EAGAIN) { 372 ALOGW("Could not write wake signal, errno=%d", errno); 373 } 374 } 375 } 376 377 void Looper::awoken() { 378 #if DEBUG_POLL_AND_WAKE 379 ALOGD("%p ~ awoken", this); 380 #endif 381 382 char buffer[16]; 383 ssize_t nRead; 384 do { 385 nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); 386 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); 387 } 388 389 void Looper::pushResponse(int events, const Request& request) { 390 Response response; 391 response.events = events; 392 response.request = request; 393 mResponses.push(response); 394 } 395 396 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { 397 return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); 398 } 399 400 int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { 401 #if DEBUG_CALLBACKS 402 ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, 403 events, callback.get(), data); 404 #endif 405 406 if (!callback.get()) { 407 if (! mAllowNonCallbacks) { 408 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); 409 return -1; 410 } 411 412 if (ident < 0) { 413 ALOGE("Invalid attempt to set NULL callback with ident < 0."); 414 return -1; 415 } 416 } else { 417 ident = ALOOPER_POLL_CALLBACK; 418 } 419 420 int epollEvents = 0; 421 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; 422 if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; 423 424 { // acquire lock 425 AutoMutex _l(mLock); 426 427 Request request; 428 request.fd = fd; 429 request.ident = ident; 430 request.callback = callback; 431 request.data = data; 432 433 struct epoll_event eventItem; 434 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 435 eventItem.events = epollEvents; 436 eventItem.data.fd = fd; 437 438 ssize_t requestIndex = mRequests.indexOfKey(fd); 439 if (requestIndex < 0) { 440 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); 441 if (epollResult < 0) { 442 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); 443 return -1; 444 } 445 mRequests.add(fd, request); 446 } else { 447 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); 448 if (epollResult < 0) { 449 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); 450 return -1; 451 } 452 mRequests.replaceValueAt(requestIndex, request); 453 } 454 } // release lock 455 return 1; 456 } 457 458 int Looper::removeFd(int fd) { 459 #if DEBUG_CALLBACKS 460 ALOGD("%p ~ removeFd - fd=%d", this, fd); 461 #endif 462 463 { // acquire lock 464 AutoMutex _l(mLock); 465 ssize_t requestIndex = mRequests.indexOfKey(fd); 466 if (requestIndex < 0) { 467 return 0; 468 } 469 470 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL); 471 if (epollResult < 0) { 472 ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno); 473 return -1; 474 } 475 476 mRequests.removeItemsAt(requestIndex); 477 } // release lock 478 return 1; 479 } 480 481 void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) { 482 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 483 sendMessageAtTime(now, handler, message); 484 } 485 486 void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler, 487 const Message& message) { 488 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 489 sendMessageAtTime(now + uptimeDelay, handler, message); 490 } 491 492 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, 493 const Message& message) { 494 #if DEBUG_CALLBACKS 495 ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", 496 this, uptime, handler.get(), message.what); 497 #endif 498 499 size_t i = 0; 500 { // acquire lock 501 AutoMutex _l(mLock); 502 503 size_t messageCount = mMessageEnvelopes.size(); 504 while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { 505 i += 1; 506 } 507 508 MessageEnvelope messageEnvelope(uptime, handler, message); 509 mMessageEnvelopes.insertAt(messageEnvelope, i, 1); 510 511 // Optimization: If the Looper is currently sending a message, then we can skip 512 // the call to wake() because the next thing the Looper will do after processing 513 // messages is to decide when the next wakeup time should be. In fact, it does 514 // not even matter whether this code is running on the Looper thread. 515 if (mSendingMessage) { 516 return; 517 } 518 } // release lock 519 520 // Wake the poll loop only when we enqueue a new message at the head. 521 if (i == 0) { 522 wake(); 523 } 524 } 525 526 void Looper::removeMessages(const sp<MessageHandler>& handler) { 527 #if DEBUG_CALLBACKS 528 ALOGD("%p ~ removeMessages - handler=%p", this, handler.get()); 529 #endif 530 531 { // acquire lock 532 AutoMutex _l(mLock); 533 534 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 535 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 536 if (messageEnvelope.handler == handler) { 537 mMessageEnvelopes.removeAt(i); 538 } 539 } 540 } // release lock 541 } 542 543 void Looper::removeMessages(const sp<MessageHandler>& handler, int what) { 544 #if DEBUG_CALLBACKS 545 ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what); 546 #endif 547 548 { // acquire lock 549 AutoMutex _l(mLock); 550 551 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 552 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 553 if (messageEnvelope.handler == handler 554 && messageEnvelope.message.what == what) { 555 mMessageEnvelopes.removeAt(i); 556 } 557 } 558 } // release lock 559 } 560 561 } // namespace android 562