Home | History | Annotate | Download | only in utils
      1 //
      2 // Copyright 2010 The Android Open Source Project
      3 //
      4 // A looper implementation based on epoll().
      5 //
      6 #define LOG_TAG "Looper"
      7 
      8 //#define LOG_NDEBUG 0
      9 
     10 // Debugs poll and wake interactions.
     11 #define DEBUG_POLL_AND_WAKE 0
     12 
     13 // Debugs callback registration and invocation.
     14 #define DEBUG_CALLBACKS 0
     15 
     16 #include <cutils/log.h>
     17 #include <utils/Looper.h>
     18 #include <utils/Timers.h>
     19 
     20 #include <unistd.h>
     21 #include <fcntl.h>
     22 #include <limits.h>
     23 
     24 
     25 namespace android {
     26 
     27 // --- WeakMessageHandler ---
     28 
     29 WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
     30         mHandler(handler) {
     31 }
     32 
     33 WeakMessageHandler::~WeakMessageHandler() {
     34 }
     35 
     36 void WeakMessageHandler::handleMessage(const Message& message) {
     37     sp<MessageHandler> handler = mHandler.promote();
     38     if (handler != NULL) {
     39         handler->handleMessage(message);
     40     }
     41 }
     42 
     43 
     44 // --- SimpleLooperCallback ---
     45 
     46 SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
     47         mCallback(callback) {
     48 }
     49 
     50 SimpleLooperCallback::~SimpleLooperCallback() {
     51 }
     52 
     53 int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
     54     return mCallback(fd, events, data);
     55 }
     56 
     57 
     58 // --- Looper ---
     59 
     60 // Hint for number of file descriptors to be associated with the epoll instance.
     61 static const int EPOLL_SIZE_HINT = 8;
     62 
     63 // Maximum number of file descriptors for which to retrieve poll events each iteration.
     64 static const int EPOLL_MAX_EVENTS = 16;
     65 
     66 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
     67 static pthread_key_t gTLSKey = 0;
     68 
     69 Looper::Looper(bool allowNonCallbacks) :
     70         mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
     71         mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
     72     int wakeFds[2];
     73     int result = pipe(wakeFds);
     74     LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
     75 
     76     mWakeReadPipeFd = wakeFds[0];
     77     mWakeWritePipeFd = wakeFds[1];
     78 
     79     result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
     80     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
     81             errno);
     82 
     83     result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
     84     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
     85             errno);
     86 
     87     // Allocate the epoll instance and register the wake pipe.
     88     mEpollFd = epoll_create(EPOLL_SIZE_HINT);
     89     LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
     90 
     91     struct epoll_event eventItem;
     92     memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
     93     eventItem.events = EPOLLIN;
     94     eventItem.data.fd = mWakeReadPipeFd;
     95     result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
     96     LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
     97             errno);
     98 }
     99 
    100 Looper::~Looper() {
    101     close(mWakeReadPipeFd);
    102     close(mWakeWritePipeFd);
    103     close(mEpollFd);
    104 }
    105 
    106 void Looper::initTLSKey() {
    107     int result = pthread_key_create(& gTLSKey, threadDestructor);
    108     LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
    109 }
    110 
    111 void Looper::threadDestructor(void *st) {
    112     Looper* const self = static_cast<Looper*>(st);
    113     if (self != NULL) {
    114         self->decStrong((void*)threadDestructor);
    115     }
    116 }
    117 
    118 void Looper::setForThread(const sp<Looper>& looper) {
    119     sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
    120 
    121     if (looper != NULL) {
    122         looper->incStrong((void*)threadDestructor);
    123     }
    124 
    125     pthread_setspecific(gTLSKey, looper.get());
    126 
    127     if (old != NULL) {
    128         old->decStrong((void*)threadDestructor);
    129     }
    130 }
    131 
    132 sp<Looper> Looper::getForThread() {
    133     int result = pthread_once(& gTLSOnce, initTLSKey);
    134     LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
    135 
    136     return (Looper*)pthread_getspecific(gTLSKey);
    137 }
    138 
    139 sp<Looper> Looper::prepare(int opts) {
    140     bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
    141     sp<Looper> looper = Looper::getForThread();
    142     if (looper == NULL) {
    143         looper = new Looper(allowNonCallbacks);
    144         Looper::setForThread(looper);
    145     }
    146     if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
    147         ALOGW("Looper already prepared for this thread with a different value for the "
    148                 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
    149     }
    150     return looper;
    151 }
    152 
    153 bool Looper::getAllowNonCallbacks() const {
    154     return mAllowNonCallbacks;
    155 }
    156 
    157 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    158     int result = 0;
    159     for (;;) {
    160         while (mResponseIndex < mResponses.size()) {
    161             const Response& response = mResponses.itemAt(mResponseIndex++);
    162             int ident = response.request.ident;
    163             if (ident >= 0) {
    164                 int fd = response.request.fd;
    165                 int events = response.events;
    166                 void* data = response.request.data;
    167 #if DEBUG_POLL_AND_WAKE
    168                 ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
    169                         "fd=%d, events=0x%x, data=%p",
    170                         this, ident, fd, events, data);
    171 #endif
    172                 if (outFd != NULL) *outFd = fd;
    173                 if (outEvents != NULL) *outEvents = events;
    174                 if (outData != NULL) *outData = data;
    175                 return ident;
    176             }
    177         }
    178 
    179         if (result != 0) {
    180 #if DEBUG_POLL_AND_WAKE
    181             ALOGD("%p ~ pollOnce - returning result %d", this, result);
    182 #endif
    183             if (outFd != NULL) *outFd = 0;
    184             if (outEvents != NULL) *outEvents = 0;
    185             if (outData != NULL) *outData = NULL;
    186             return result;
    187         }
    188 
    189         result = pollInner(timeoutMillis);
    190     }
    191 }
    192 
    193 int Looper::pollInner(int timeoutMillis) {
    194 #if DEBUG_POLL_AND_WAKE
    195     ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
    196 #endif
    197 
    198     // Adjust the timeout based on when the next message is due.
    199     if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
    200         nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    201         int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
    202         if (messageTimeoutMillis >= 0
    203                 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
    204             timeoutMillis = messageTimeoutMillis;
    205         }
    206 #if DEBUG_POLL_AND_WAKE
    207         ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
    208                 this, mNextMessageUptime - now, timeoutMillis);
    209 #endif
    210     }
    211 
    212     // Poll.
    213     int result = ALOOPER_POLL_WAKE;
    214     mResponses.clear();
    215     mResponseIndex = 0;
    216 
    217     struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    218     int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    219 
    220     // Acquire lock.
    221     mLock.lock();
    222 
    223     // Check for poll error.
    224     if (eventCount < 0) {
    225         if (errno == EINTR) {
    226             goto Done;
    227         }
    228         ALOGW("Poll failed with an unexpected error, errno=%d", errno);
    229         result = ALOOPER_POLL_ERROR;
    230         goto Done;
    231     }
    232 
    233     // Check for poll timeout.
    234     if (eventCount == 0) {
    235 #if DEBUG_POLL_AND_WAKE
    236         ALOGD("%p ~ pollOnce - timeout", this);
    237 #endif
    238         result = ALOOPER_POLL_TIMEOUT;
    239         goto Done;
    240     }
    241 
    242     // Handle all events.
    243 #if DEBUG_POLL_AND_WAKE
    244     ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
    245 #endif
    246 
    247     for (int i = 0; i < eventCount; i++) {
    248         int fd = eventItems[i].data.fd;
    249         uint32_t epollEvents = eventItems[i].events;
    250         if (fd == mWakeReadPipeFd) {
    251             if (epollEvents & EPOLLIN) {
    252                 awoken();
    253             } else {
    254                 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
    255             }
    256         } else {
    257             ssize_t requestIndex = mRequests.indexOfKey(fd);
    258             if (requestIndex >= 0) {
    259                 int events = 0;
    260                 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
    261                 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
    262                 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
    263                 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
    264                 pushResponse(events, mRequests.valueAt(requestIndex));
    265             } else {
    266                 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
    267                         "no longer registered.", epollEvents, fd);
    268             }
    269         }
    270     }
    271 Done: ;
    272 
    273     // Invoke pending message callbacks.
    274     mNextMessageUptime = LLONG_MAX;
    275     while (mMessageEnvelopes.size() != 0) {
    276         nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    277         const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
    278         if (messageEnvelope.uptime <= now) {
    279             // Remove the envelope from the list.
    280             // We keep a strong reference to the handler until the call to handleMessage
    281             // finishes.  Then we drop it so that the handler can be deleted *before*
    282             // we reacquire our lock.
    283             { // obtain handler
    284                 sp<MessageHandler> handler = messageEnvelope.handler;
    285                 Message message = messageEnvelope.message;
    286                 mMessageEnvelopes.removeAt(0);
    287                 mSendingMessage = true;
    288                 mLock.unlock();
    289 
    290 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
    291                 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
    292                         this, handler.get(), message.what);
    293 #endif
    294                 handler->handleMessage(message);
    295             } // release handler
    296 
    297             mLock.lock();
    298             mSendingMessage = false;
    299             result = ALOOPER_POLL_CALLBACK;
    300         } else {
    301             // The last message left at the head of the queue determines the next wakeup time.
    302             mNextMessageUptime = messageEnvelope.uptime;
    303             break;
    304         }
    305     }
    306 
    307     // Release lock.
    308     mLock.unlock();
    309 
    310     // Invoke all response callbacks.
    311     for (size_t i = 0; i < mResponses.size(); i++) {
    312         Response& response = mResponses.editItemAt(i);
    313         if (response.request.ident == ALOOPER_POLL_CALLBACK) {
    314             int fd = response.request.fd;
    315             int events = response.events;
    316             void* data = response.request.data;
    317 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
    318             ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
    319                     this, response.request.callback.get(), fd, events, data);
    320 #endif
    321             int callbackResult = response.request.callback->handleEvent(fd, events, data);
    322             if (callbackResult == 0) {
    323                 removeFd(fd);
    324             }
    325             // Clear the callback reference in the response structure promptly because we
    326             // will not clear the response vector itself until the next poll.
    327             response.request.callback.clear();
    328             result = ALOOPER_POLL_CALLBACK;
    329         }
    330     }
    331     return result;
    332 }
    333 
    334 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    335     if (timeoutMillis <= 0) {
    336         int result;
    337         do {
    338             result = pollOnce(timeoutMillis, outFd, outEvents, outData);
    339         } while (result == ALOOPER_POLL_CALLBACK);
    340         return result;
    341     } else {
    342         nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
    343                 + milliseconds_to_nanoseconds(timeoutMillis);
    344 
    345         for (;;) {
    346             int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
    347             if (result != ALOOPER_POLL_CALLBACK) {
    348                 return result;
    349             }
    350 
    351             nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    352             timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
    353             if (timeoutMillis == 0) {
    354                 return ALOOPER_POLL_TIMEOUT;
    355             }
    356         }
    357     }
    358 }
    359 
    360 void Looper::wake() {
    361 #if DEBUG_POLL_AND_WAKE
    362     ALOGD("%p ~ wake", this);
    363 #endif
    364 
    365     ssize_t nWrite;
    366     do {
    367         nWrite = write(mWakeWritePipeFd, "W", 1);
    368     } while (nWrite == -1 && errno == EINTR);
    369 
    370     if (nWrite != 1) {
    371         if (errno != EAGAIN) {
    372             ALOGW("Could not write wake signal, errno=%d", errno);
    373         }
    374     }
    375 }
    376 
    377 void Looper::awoken() {
    378 #if DEBUG_POLL_AND_WAKE
    379     ALOGD("%p ~ awoken", this);
    380 #endif
    381 
    382     char buffer[16];
    383     ssize_t nRead;
    384     do {
    385         nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    386     } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
    387 }
    388 
    389 void Looper::pushResponse(int events, const Request& request) {
    390     Response response;
    391     response.events = events;
    392     response.request = request;
    393     mResponses.push(response);
    394 }
    395 
    396 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
    397     return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
    398 }
    399 
    400 int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    401 #if DEBUG_CALLBACKS
    402     ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
    403             events, callback.get(), data);
    404 #endif
    405 
    406     if (!callback.get()) {
    407         if (! mAllowNonCallbacks) {
    408             ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
    409             return -1;
    410         }
    411 
    412         if (ident < 0) {
    413             ALOGE("Invalid attempt to set NULL callback with ident < 0.");
    414             return -1;
    415         }
    416     } else {
    417         ident = ALOOPER_POLL_CALLBACK;
    418     }
    419 
    420     int epollEvents = 0;
    421     if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
    422     if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
    423 
    424     { // acquire lock
    425         AutoMutex _l(mLock);
    426 
    427         Request request;
    428         request.fd = fd;
    429         request.ident = ident;
    430         request.callback = callback;
    431         request.data = data;
    432 
    433         struct epoll_event eventItem;
    434         memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    435         eventItem.events = epollEvents;
    436         eventItem.data.fd = fd;
    437 
    438         ssize_t requestIndex = mRequests.indexOfKey(fd);
    439         if (requestIndex < 0) {
    440             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
    441             if (epollResult < 0) {
    442                 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
    443                 return -1;
    444             }
    445             mRequests.add(fd, request);
    446         } else {
    447             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
    448             if (epollResult < 0) {
    449                 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
    450                 return -1;
    451             }
    452             mRequests.replaceValueAt(requestIndex, request);
    453         }
    454     } // release lock
    455     return 1;
    456 }
    457 
    458 int Looper::removeFd(int fd) {
    459 #if DEBUG_CALLBACKS
    460     ALOGD("%p ~ removeFd - fd=%d", this, fd);
    461 #endif
    462 
    463     { // acquire lock
    464         AutoMutex _l(mLock);
    465         ssize_t requestIndex = mRequests.indexOfKey(fd);
    466         if (requestIndex < 0) {
    467             return 0;
    468         }
    469 
    470         int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
    471         if (epollResult < 0) {
    472             ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
    473             return -1;
    474         }
    475 
    476         mRequests.removeItemsAt(requestIndex);
    477     } // release lock
    478     return 1;
    479 }
    480 
    481 void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
    482     nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    483     sendMessageAtTime(now, handler, message);
    484 }
    485 
    486 void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
    487         const Message& message) {
    488     nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    489     sendMessageAtTime(now + uptimeDelay, handler, message);
    490 }
    491 
    492 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
    493         const Message& message) {
    494 #if DEBUG_CALLBACKS
    495     ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
    496             this, uptime, handler.get(), message.what);
    497 #endif
    498 
    499     size_t i = 0;
    500     { // acquire lock
    501         AutoMutex _l(mLock);
    502 
    503         size_t messageCount = mMessageEnvelopes.size();
    504         while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
    505             i += 1;
    506         }
    507 
    508         MessageEnvelope messageEnvelope(uptime, handler, message);
    509         mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
    510 
    511         // Optimization: If the Looper is currently sending a message, then we can skip
    512         // the call to wake() because the next thing the Looper will do after processing
    513         // messages is to decide when the next wakeup time should be.  In fact, it does
    514         // not even matter whether this code is running on the Looper thread.
    515         if (mSendingMessage) {
    516             return;
    517         }
    518     } // release lock
    519 
    520     // Wake the poll loop only when we enqueue a new message at the head.
    521     if (i == 0) {
    522         wake();
    523     }
    524 }
    525 
    526 void Looper::removeMessages(const sp<MessageHandler>& handler) {
    527 #if DEBUG_CALLBACKS
    528     ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
    529 #endif
    530 
    531     { // acquire lock
    532         AutoMutex _l(mLock);
    533 
    534         for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
    535             const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
    536             if (messageEnvelope.handler == handler) {
    537                 mMessageEnvelopes.removeAt(i);
    538             }
    539         }
    540     } // release lock
    541 }
    542 
    543 void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
    544 #if DEBUG_CALLBACKS
    545     ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
    546 #endif
    547 
    548     { // acquire lock
    549         AutoMutex _l(mLock);
    550 
    551         for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
    552             const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
    553             if (messageEnvelope.handler == handler
    554                     && messageEnvelope.message.what == what) {
    555                 mMessageEnvelopes.removeAt(i);
    556             }
    557         }
    558     } // release lock
    559 }
    560 
    561 } // namespace android
    562