Home | History | Annotate | Download | only in utils
      1 //
      2 // Copyright 2010 The Android Open Source Project
      3 //
      4 // A looper implementation based on epoll().
      5 //
      6 #define LOG_TAG "Looper"
      7 
      8 //#define LOG_NDEBUG 0
      9 
     10 // Debugs poll and wake interactions.
     11 #define DEBUG_POLL_AND_WAKE 0
     12 
     13 // Debugs callback registration and invocation.
     14 #define DEBUG_CALLBACKS 0
     15 
     16 #include <cutils/log.h>
     17 #include <utils/Looper.h>
     18 #include <utils/Timers.h>
     19 
     20 #include <unistd.h>
     21 #include <fcntl.h>
     22 
     23 
     24 namespace android {
     25 
     26 #ifdef LOOPER_USES_EPOLL
     27 // Hint for number of file descriptors to be associated with the epoll instance.
     28 static const int EPOLL_SIZE_HINT = 8;
     29 
     30 // Maximum number of file descriptors for which to retrieve poll events each iteration.
     31 static const int EPOLL_MAX_EVENTS = 16;
     32 #endif
     33 
     34 static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
     35 static pthread_key_t gTLSKey = 0;
     36 
     37 Looper::Looper(bool allowNonCallbacks) :
     38         mAllowNonCallbacks(allowNonCallbacks),
     39         mResponseIndex(0) {
     40     int wakeFds[2];
     41     int result = pipe(wakeFds);
     42     LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
     43 
     44     mWakeReadPipeFd = wakeFds[0];
     45     mWakeWritePipeFd = wakeFds[1];
     46 
     47     result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
     48     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
     49             errno);
     50 
     51     result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
     52     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
     53             errno);
     54 
     55 #ifdef LOOPER_USES_EPOLL
     56     // Allocate the epoll instance and register the wake pipe.
     57     mEpollFd = epoll_create(EPOLL_SIZE_HINT);
     58     LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
     59 
     60     struct epoll_event eventItem;
     61     memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
     62     eventItem.events = EPOLLIN;
     63     eventItem.data.fd = mWakeReadPipeFd;
     64     result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
     65     LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
     66             errno);
     67 #else
     68     // Add the wake pipe to the head of the request list with a null callback.
     69     struct pollfd requestedFd;
     70     requestedFd.fd = mWakeReadPipeFd;
     71     requestedFd.events = POLLIN;
     72     mRequestedFds.push(requestedFd);
     73 
     74     Request request;
     75     request.fd = mWakeReadPipeFd;
     76     request.callback = NULL;
     77     request.ident = 0;
     78     request.data = NULL;
     79     mRequests.push(request);
     80 
     81     mPolling = false;
     82     mWaiters = 0;
     83 #endif
     84 
     85 #ifdef LOOPER_STATISTICS
     86     mPendingWakeTime = -1;
     87     mPendingWakeCount = 0;
     88     mSampledWakeCycles = 0;
     89     mSampledWakeCountSum = 0;
     90     mSampledWakeLatencySum = 0;
     91 
     92     mSampledPolls = 0;
     93     mSampledZeroPollCount = 0;
     94     mSampledZeroPollLatencySum = 0;
     95     mSampledTimeoutPollCount = 0;
     96     mSampledTimeoutPollLatencySum = 0;
     97 #endif
     98 }
     99 
    100 Looper::~Looper() {
    101     close(mWakeReadPipeFd);
    102     close(mWakeWritePipeFd);
    103 #ifdef LOOPER_USES_EPOLL
    104     close(mEpollFd);
    105 #endif
    106 }
    107 
    108 void Looper::initTLSKey() {
    109     int result = pthread_key_create(& gTLSKey, threadDestructor);
    110     LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
    111 }
    112 
    113 void Looper::threadDestructor(void *st) {
    114     Looper* const self = static_cast<Looper*>(st);
    115     if (self != NULL) {
    116         self->decStrong((void*)threadDestructor);
    117     }
    118 }
    119 
    120 void Looper::setForThread(const sp<Looper>& looper) {
    121     sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
    122 
    123     if (looper != NULL) {
    124         looper->incStrong((void*)threadDestructor);
    125     }
    126 
    127     pthread_setspecific(gTLSKey, looper.get());
    128 
    129     if (old != NULL) {
    130         old->decStrong((void*)threadDestructor);
    131     }
    132 }
    133 
    134 sp<Looper> Looper::getForThread() {
    135     int result = pthread_once(& gTLSOnce, initTLSKey);
    136     LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
    137 
    138     return (Looper*)pthread_getspecific(gTLSKey);
    139 }
    140 
    141 sp<Looper> Looper::prepare(int opts) {
    142     bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
    143     sp<Looper> looper = Looper::getForThread();
    144     if (looper == NULL) {
    145         looper = new Looper(allowNonCallbacks);
    146         Looper::setForThread(looper);
    147     }
    148     if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
    149         LOGW("Looper already prepared for this thread with a different value for the "
    150                 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
    151     }
    152     return looper;
    153 }
    154 
    155 bool Looper::getAllowNonCallbacks() const {
    156     return mAllowNonCallbacks;
    157 }
    158 
    159 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    160     int result = 0;
    161     for (;;) {
    162         while (mResponseIndex < mResponses.size()) {
    163             const Response& response = mResponses.itemAt(mResponseIndex++);
    164             if (! response.request.callback) {
    165 #if DEBUG_POLL_AND_WAKE
    166                 LOGD("%p ~ pollOnce - returning signalled identifier %d: "
    167                         "fd=%d, events=0x%x, data=%p", this,
    168                         response.request.ident, response.request.fd,
    169                         response.events, response.request.data);
    170 #endif
    171                 if (outFd != NULL) *outFd = response.request.fd;
    172                 if (outEvents != NULL) *outEvents = response.events;
    173                 if (outData != NULL) *outData = response.request.data;
    174                 return response.request.ident;
    175             }
    176         }
    177 
    178         if (result != 0) {
    179 #if DEBUG_POLL_AND_WAKE
    180             LOGD("%p ~ pollOnce - returning result %d", this, result);
    181 #endif
    182             if (outFd != NULL) *outFd = 0;
    183             if (outEvents != NULL) *outEvents = NULL;
    184             if (outData != NULL) *outData = NULL;
    185             return result;
    186         }
    187 
    188         result = pollInner(timeoutMillis);
    189     }
    190 }
    191 
    192 int Looper::pollInner(int timeoutMillis) {
    193 #if DEBUG_POLL_AND_WAKE
    194     LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
    195 #endif
    196 
    197     int result = ALOOPER_POLL_WAKE;
    198     mResponses.clear();
    199     mResponseIndex = 0;
    200 
    201 #ifdef LOOPER_STATISTICS
    202     nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
    203 #endif
    204 
    205 #ifdef LOOPER_USES_EPOLL
    206     struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    207     int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    208     bool acquiredLock = false;
    209 #else
    210     // Wait for wakeAndLock() waiters to run then set mPolling to true.
    211     mLock.lock();
    212     while (mWaiters != 0) {
    213         mResume.wait(mLock);
    214     }
    215     mPolling = true;
    216     mLock.unlock();
    217 
    218     size_t requestedCount = mRequestedFds.size();
    219     int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
    220 #endif
    221 
    222     if (eventCount < 0) {
    223         if (errno == EINTR) {
    224             goto Done;
    225         }
    226 
    227         LOGW("Poll failed with an unexpected error, errno=%d", errno);
    228         result = ALOOPER_POLL_ERROR;
    229         goto Done;
    230     }
    231 
    232     if (eventCount == 0) {
    233 #if DEBUG_POLL_AND_WAKE
    234         LOGD("%p ~ pollOnce - timeout", this);
    235 #endif
    236         result = ALOOPER_POLL_TIMEOUT;
    237         goto Done;
    238     }
    239 
    240 #if DEBUG_POLL_AND_WAKE
    241     LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
    242 #endif
    243 
    244 #ifdef LOOPER_USES_EPOLL
    245     for (int i = 0; i < eventCount; i++) {
    246         int fd = eventItems[i].data.fd;
    247         uint32_t epollEvents = eventItems[i].events;
    248         if (fd == mWakeReadPipeFd) {
    249             if (epollEvents & EPOLLIN) {
    250                 awoken();
    251             } else {
    252                 LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
    253             }
    254         } else {
    255             if (! acquiredLock) {
    256                 mLock.lock();
    257                 acquiredLock = true;
    258             }
    259 
    260             ssize_t requestIndex = mRequests.indexOfKey(fd);
    261             if (requestIndex >= 0) {
    262                 int events = 0;
    263                 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
    264                 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
    265                 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
    266                 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
    267                 pushResponse(events, mRequests.valueAt(requestIndex));
    268             } else {
    269                 LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
    270                         "no longer registered.", epollEvents, fd);
    271             }
    272         }
    273     }
    274     if (acquiredLock) {
    275         mLock.unlock();
    276     }
    277 Done: ;
    278 #else
    279     for (size_t i = 0; i < requestedCount; i++) {
    280         const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
    281 
    282         short pollEvents = requestedFd.revents;
    283         if (pollEvents) {
    284             if (requestedFd.fd == mWakeReadPipeFd) {
    285                 if (pollEvents & POLLIN) {
    286                     awoken();
    287                 } else {
    288                     LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
    289                 }
    290             } else {
    291                 int events = 0;
    292                 if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
    293                 if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
    294                 if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
    295                 if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
    296                 if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
    297                 pushResponse(events, mRequests.itemAt(i));
    298             }
    299             if (--eventCount == 0) {
    300                 break;
    301             }
    302         }
    303     }
    304 
    305 Done:
    306     // Set mPolling to false and wake up the wakeAndLock() waiters.
    307     mLock.lock();
    308     mPolling = false;
    309     if (mWaiters != 0) {
    310         mAwake.broadcast();
    311     }
    312     mLock.unlock();
    313 #endif
    314 
    315 #ifdef LOOPER_STATISTICS
    316     nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
    317     mSampledPolls += 1;
    318     if (timeoutMillis == 0) {
    319         mSampledZeroPollCount += 1;
    320         mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
    321     } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
    322         mSampledTimeoutPollCount += 1;
    323         mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
    324                 - milliseconds_to_nanoseconds(timeoutMillis);
    325     }
    326     if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
    327         LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
    328                 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
    329                 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
    330         mSampledPolls = 0;
    331         mSampledZeroPollCount = 0;
    332         mSampledZeroPollLatencySum = 0;
    333         mSampledTimeoutPollCount = 0;
    334         mSampledTimeoutPollLatencySum = 0;
    335     }
    336 #endif
    337 
    338     for (size_t i = 0; i < mResponses.size(); i++) {
    339         const Response& response = mResponses.itemAt(i);
    340         if (response.request.callback) {
    341 #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
    342             LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
    343                     response.request.fd, response.events, response.request.data);
    344 #endif
    345             int callbackResult = response.request.callback(
    346                     response.request.fd, response.events, response.request.data);
    347             if (callbackResult == 0) {
    348                 removeFd(response.request.fd);
    349             }
    350 
    351             result = ALOOPER_POLL_CALLBACK;
    352         }
    353     }
    354     return result;
    355 }
    356 
    357 int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    358     if (timeoutMillis <= 0) {
    359         int result;
    360         do {
    361             result = pollOnce(timeoutMillis, outFd, outEvents, outData);
    362         } while (result == ALOOPER_POLL_CALLBACK);
    363         return result;
    364     } else {
    365         nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
    366                 + milliseconds_to_nanoseconds(timeoutMillis);
    367 
    368         for (;;) {
    369             int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
    370             if (result != ALOOPER_POLL_CALLBACK) {
    371                 return result;
    372             }
    373 
    374             nsecs_t timeoutNanos = endTime - systemTime(SYSTEM_TIME_MONOTONIC);
    375             if (timeoutNanos <= 0) {
    376                 return ALOOPER_POLL_TIMEOUT;
    377             }
    378 
    379             timeoutMillis = int(nanoseconds_to_milliseconds(timeoutNanos + 999999LL));
    380         }
    381     }
    382 }
    383 
    384 void Looper::wake() {
    385 #if DEBUG_POLL_AND_WAKE
    386     LOGD("%p ~ wake", this);
    387 #endif
    388 
    389 #ifdef LOOPER_STATISTICS
    390     // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
    391     if (mPendingWakeCount++ == 0) {
    392         mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
    393     }
    394 #endif
    395 
    396     ssize_t nWrite;
    397     do {
    398         nWrite = write(mWakeWritePipeFd, "W", 1);
    399     } while (nWrite == -1 && errno == EINTR);
    400 
    401     if (nWrite != 1) {
    402         if (errno != EAGAIN) {
    403             LOGW("Could not write wake signal, errno=%d", errno);
    404         }
    405     }
    406 }
    407 
    408 void Looper::awoken() {
    409 #if DEBUG_POLL_AND_WAKE
    410     LOGD("%p ~ awoken", this);
    411 #endif
    412 
    413 #ifdef LOOPER_STATISTICS
    414     if (mPendingWakeCount == 0) {
    415         LOGD("%p ~ awoken: spurious!", this);
    416     } else {
    417         mSampledWakeCycles += 1;
    418         mSampledWakeCountSum += mPendingWakeCount;
    419         mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
    420         mPendingWakeCount = 0;
    421         mPendingWakeTime = -1;
    422         if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
    423             LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
    424                     0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
    425                     float(mSampledWakeCountSum) / mSampledWakeCycles);
    426             mSampledWakeCycles = 0;
    427             mSampledWakeCountSum = 0;
    428             mSampledWakeLatencySum = 0;
    429         }
    430     }
    431 #endif
    432 
    433     char buffer[16];
    434     ssize_t nRead;
    435     do {
    436         nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    437     } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
    438 }
    439 
    440 void Looper::pushResponse(int events, const Request& request) {
    441     Response response;
    442     response.events = events;
    443     response.request = request;
    444     mResponses.push(response);
    445 }
    446 
    447 int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
    448 #if DEBUG_CALLBACKS
    449     LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
    450             events, callback, data);
    451 #endif
    452 
    453     if (! callback) {
    454         if (! mAllowNonCallbacks) {
    455             LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
    456             return -1;
    457         }
    458 
    459         if (ident < 0) {
    460             LOGE("Invalid attempt to set NULL callback with ident <= 0.");
    461             return -1;
    462         }
    463     }
    464 
    465 #ifdef LOOPER_USES_EPOLL
    466     int epollEvents = 0;
    467     if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
    468     if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
    469 
    470     { // acquire lock
    471         AutoMutex _l(mLock);
    472 
    473         Request request;
    474         request.fd = fd;
    475         request.ident = ident;
    476         request.callback = callback;
    477         request.data = data;
    478 
    479         struct epoll_event eventItem;
    480         memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    481         eventItem.events = epollEvents;
    482         eventItem.data.fd = fd;
    483 
    484         ssize_t requestIndex = mRequests.indexOfKey(fd);
    485         if (requestIndex < 0) {
    486             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
    487             if (epollResult < 0) {
    488                 LOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
    489                 return -1;
    490             }
    491             mRequests.add(fd, request);
    492         } else {
    493             int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
    494             if (epollResult < 0) {
    495                 LOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
    496                 return -1;
    497             }
    498             mRequests.replaceValueAt(requestIndex, request);
    499         }
    500     } // release lock
    501 #else
    502     int pollEvents = 0;
    503     if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
    504     if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;
    505 
    506     wakeAndLock(); // acquire lock
    507 
    508     struct pollfd requestedFd;
    509     requestedFd.fd = fd;
    510     requestedFd.events = pollEvents;
    511 
    512     Request request;
    513     request.fd = fd;
    514     request.ident = ident;
    515     request.callback = callback;
    516     request.data = data;
    517     ssize_t index = getRequestIndexLocked(fd);
    518     if (index < 0) {
    519         mRequestedFds.push(requestedFd);
    520         mRequests.push(request);
    521     } else {
    522         mRequestedFds.replaceAt(requestedFd, size_t(index));
    523         mRequests.replaceAt(request, size_t(index));
    524     }
    525 
    526     mLock.unlock(); // release lock
    527 #endif
    528     return 1;
    529 }
    530 
    531 int Looper::removeFd(int fd) {
    532 #if DEBUG_CALLBACKS
    533     LOGD("%p ~ removeFd - fd=%d", this, fd);
    534 #endif
    535 
    536 #ifdef LOOPER_USES_EPOLL
    537     { // acquire lock
    538         AutoMutex _l(mLock);
    539         ssize_t requestIndex = mRequests.indexOfKey(fd);
    540         if (requestIndex < 0) {
    541             return 0;
    542         }
    543 
    544         int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
    545         if (epollResult < 0) {
    546             LOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
    547             return -1;
    548         }
    549 
    550         mRequests.removeItemsAt(requestIndex);
    551     } // release lock
    552     return 1;
    553 #else
    554     wakeAndLock(); // acquire lock
    555 
    556     ssize_t index = getRequestIndexLocked(fd);
    557     if (index >= 0) {
    558         mRequestedFds.removeAt(size_t(index));
    559         mRequests.removeAt(size_t(index));
    560     }
    561 
    562     mLock.unlock(); // release lock
    563     return index >= 0;
    564 #endif
    565 }
    566 
    567 #ifndef LOOPER_USES_EPOLL
    568 ssize_t Looper::getRequestIndexLocked(int fd) {
    569     size_t requestCount = mRequestedFds.size();
    570 
    571     for (size_t i = 0; i < requestCount; i++) {
    572         if (mRequestedFds.itemAt(i).fd == fd) {
    573             return i;
    574         }
    575     }
    576 
    577     return -1;
    578 }
    579 
    580 void Looper::wakeAndLock() {
    581     mLock.lock();
    582 
    583     mWaiters += 1;
    584     while (mPolling) {
    585         wake();
    586         mAwake.wait(mLock);
    587     }
    588 
    589     mWaiters -= 1;
    590     if (mWaiters == 0) {
    591         mResume.signal();
    592     }
    593 }
    594 #endif
    595 
    596 } // namespace android
    597