Home | History | Annotate | Download | only in libstagefright
      1 /*
      2  * Copyright (C) 2009 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #undef __STRICT_ANSI__
     18 #define __STDINT_LIMITS
     19 #define __STDC_LIMIT_MACROS
     20 #include <stdint.h>
     21 
     22 //#define LOG_NDEBUG 0
     23 #define LOG_TAG "TimedEventQueue"
     24 #include <utils/Log.h>
     25 #include <utils/threads.h>
     26 
     27 #include "include/TimedEventQueue.h"
     28 
     29 #include <sys/prctl.h>
     30 #include <sys/time.h>
     31 
     32 #include <media/stagefright/foundation/ADebug.h>
     33 #include <media/stagefright/foundation/ALooper.h>
     34 
     35 namespace android {
     36 
     37 TimedEventQueue::TimedEventQueue()
     38     : mNextEventID(1),
     39       mRunning(false),
     40       mStopped(false) {
     41 }
     42 
     43 TimedEventQueue::~TimedEventQueue() {
     44     stop();
     45 }
     46 
     47 void TimedEventQueue::start() {
     48     if (mRunning) {
     49         return;
     50     }
     51 
     52     mStopped = false;
     53 
     54     pthread_attr_t attr;
     55     pthread_attr_init(&attr);
     56     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
     57 
     58     pthread_create(&mThread, &attr, ThreadWrapper, this);
     59 
     60     pthread_attr_destroy(&attr);
     61 
     62     mRunning = true;
     63 }
     64 
     65 void TimedEventQueue::stop(bool flush) {
     66     if (!mRunning) {
     67         return;
     68     }
     69 
     70     if (flush) {
     71         postEventToBack(new StopEvent);
     72     } else {
     73         postTimedEvent(new StopEvent, INT64_MIN);
     74     }
     75 
     76     void *dummy;
     77     pthread_join(mThread, &dummy);
     78 
     79     mQueue.clear();
     80 
     81     mRunning = false;
     82 }
     83 
     84 TimedEventQueue::event_id TimedEventQueue::postEvent(const sp<Event> &event) {
     85     // Reserve an earlier timeslot an INT64_MIN to be able to post
     86     // the StopEvent to the absolute head of the queue.
     87     return postTimedEvent(event, INT64_MIN + 1);
     88 }
     89 
     90 TimedEventQueue::event_id TimedEventQueue::postEventToBack(
     91         const sp<Event> &event) {
     92     return postTimedEvent(event, INT64_MAX);
     93 }
     94 
     95 TimedEventQueue::event_id TimedEventQueue::postEventWithDelay(
     96         const sp<Event> &event, int64_t delay_us) {
     97     CHECK(delay_us >= 0);
     98     return postTimedEvent(event, ALooper::GetNowUs() + delay_us);
     99 }
    100 
    101 TimedEventQueue::event_id TimedEventQueue::postTimedEvent(
    102         const sp<Event> &event, int64_t realtime_us) {
    103     Mutex::Autolock autoLock(mLock);
    104 
    105     event->setEventID(mNextEventID++);
    106 
    107     List<QueueItem>::iterator it = mQueue.begin();
    108     while (it != mQueue.end() && realtime_us >= (*it).realtime_us) {
    109         ++it;
    110     }
    111 
    112     QueueItem item;
    113     item.event = event;
    114     item.realtime_us = realtime_us;
    115 
    116     if (it == mQueue.begin()) {
    117         mQueueHeadChangedCondition.signal();
    118     }
    119 
    120     mQueue.insert(it, item);
    121 
    122     mQueueNotEmptyCondition.signal();
    123 
    124     return event->eventID();
    125 }
    126 
    127 static bool MatchesEventID(
    128         void *cookie, const sp<TimedEventQueue::Event> &event) {
    129     TimedEventQueue::event_id *id =
    130         static_cast<TimedEventQueue::event_id *>(cookie);
    131 
    132     if (event->eventID() != *id) {
    133         return false;
    134     }
    135 
    136     *id = 0;
    137 
    138     return true;
    139 }
    140 
    141 bool TimedEventQueue::cancelEvent(event_id id) {
    142     if (id == 0) {
    143         return false;
    144     }
    145 
    146     cancelEvents(&MatchesEventID, &id, true /* stopAfterFirstMatch */);
    147 
    148     // if MatchesEventID found a match, it will have set id to 0
    149     // (which is not a valid event_id).
    150 
    151     return id == 0;
    152 }
    153 
    154 void TimedEventQueue::cancelEvents(
    155         bool (*predicate)(void *cookie, const sp<Event> &event),
    156         void *cookie,
    157         bool stopAfterFirstMatch) {
    158     Mutex::Autolock autoLock(mLock);
    159 
    160     List<QueueItem>::iterator it = mQueue.begin();
    161     while (it != mQueue.end()) {
    162         if (!(*predicate)(cookie, (*it).event)) {
    163             ++it;
    164             continue;
    165         }
    166 
    167         if (it == mQueue.begin()) {
    168             mQueueHeadChangedCondition.signal();
    169         }
    170 
    171         ALOGV("cancelling event %d", (*it).event->eventID());
    172 
    173         (*it).event->setEventID(0);
    174         it = mQueue.erase(it);
    175 
    176         if (stopAfterFirstMatch) {
    177             return;
    178         }
    179     }
    180 }
    181 
    182 // static
    183 void *TimedEventQueue::ThreadWrapper(void *me) {
    184 
    185     androidSetThreadPriority(0, ANDROID_PRIORITY_FOREGROUND);
    186 
    187     static_cast<TimedEventQueue *>(me)->threadEntry();
    188 
    189     return NULL;
    190 }
    191 
    192 void TimedEventQueue::threadEntry() {
    193     prctl(PR_SET_NAME, (unsigned long)"TimedEventQueue", 0, 0, 0);
    194 
    195     for (;;) {
    196         int64_t now_us = 0;
    197         sp<Event> event;
    198 
    199         {
    200             Mutex::Autolock autoLock(mLock);
    201 
    202             if (mStopped) {
    203                 break;
    204             }
    205 
    206             while (mQueue.empty()) {
    207                 mQueueNotEmptyCondition.wait(mLock);
    208             }
    209 
    210             event_id eventID = 0;
    211             for (;;) {
    212                 if (mQueue.empty()) {
    213                     // The only event in the queue could have been cancelled
    214                     // while we were waiting for its scheduled time.
    215                     break;
    216                 }
    217 
    218                 List<QueueItem>::iterator it = mQueue.begin();
    219                 eventID = (*it).event->eventID();
    220 
    221                 now_us = ALooper::GetNowUs();
    222                 int64_t when_us = (*it).realtime_us;
    223 
    224                 int64_t delay_us;
    225                 if (when_us < 0 || when_us == INT64_MAX) {
    226                     delay_us = 0;
    227                 } else {
    228                     delay_us = when_us - now_us;
    229                 }
    230 
    231                 if (delay_us <= 0) {
    232                     break;
    233                 }
    234 
    235                 static int64_t kMaxTimeoutUs = 10000000ll;  // 10 secs
    236                 bool timeoutCapped = false;
    237                 if (delay_us > kMaxTimeoutUs) {
    238                     ALOGW("delay_us exceeds max timeout: %lld us", delay_us);
    239 
    240                     // We'll never block for more than 10 secs, instead
    241                     // we will split up the full timeout into chunks of
    242                     // 10 secs at a time. This will also avoid overflow
    243                     // when converting from us to ns.
    244                     delay_us = kMaxTimeoutUs;
    245                     timeoutCapped = true;
    246                 }
    247 
    248                 status_t err = mQueueHeadChangedCondition.waitRelative(
    249                         mLock, delay_us * 1000ll);
    250 
    251                 if (!timeoutCapped && err == -ETIMEDOUT) {
    252                     // We finally hit the time this event is supposed to
    253                     // trigger.
    254                     now_us = ALooper::GetNowUs();
    255                     break;
    256                 }
    257             }
    258 
    259             // The event w/ this id may have been cancelled while we're
    260             // waiting for its trigger-time, in that case
    261             // removeEventFromQueue_l will return NULL.
    262             // Otherwise, the QueueItem will be removed
    263             // from the queue and the referenced event returned.
    264             event = removeEventFromQueue_l(eventID);
    265         }
    266 
    267         if (event != NULL) {
    268             // Fire event with the lock NOT held.
    269             event->fire(this, now_us);
    270         }
    271     }
    272 }
    273 
    274 sp<TimedEventQueue::Event> TimedEventQueue::removeEventFromQueue_l(
    275         event_id id) {
    276     for (List<QueueItem>::iterator it = mQueue.begin();
    277          it != mQueue.end(); ++it) {
    278         if ((*it).event->eventID() == id) {
    279             sp<Event> event = (*it).event;
    280             event->setEventID(0);
    281 
    282             mQueue.erase(it);
    283 
    284             return event;
    285         }
    286     }
    287 
    288     ALOGW("Event %d was not found in the queue, already cancelled?", id);
    289 
    290     return NULL;
    291 }
    292 
    293 }  // namespace android
    294 
    295