Home | History | Annotate | Download | only in src
      1 /* ------------------------------------------------------------------
      2  * Copyright (C) 1998-2009 PacketVideo
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
     13  * express or implied.
     14  * See the License for the specific language governing permissions
     15  * and limitations under the License.
     16  * -------------------------------------------------------------------
     17  */
     18 
     19 #include "threadsafe_queue.h"
     20 
     21 //Only do sem wait on pre-emptive thread OS.
     22 //The reason is that sem wait is not always available on
     23 //non-pre-emptive OS (such as under Brew applet thread)
     24 #if !(OSCL_HAS_NON_PREEMPTIVE_THREAD_SUPPORT)
     25 #define USE_SEM_WAIT 1
     26 #else
     27 #define USE_SEM_WAIT 0
     28 #endif
     29 
     30 OSCL_EXPORT_REF ThreadSafeQueue::ThreadSafeQueue()
     31         : OsclActiveObject(OsclActiveObject::EPriorityNominal, "ThreadSafeQueue")
     32 {
     33     iObserver = NULL;
     34     iCounter = 1;
     35     if (OsclThread::GetId(iThreadId) != OsclProcStatus::SUCCESS_ERROR)
     36         OsclError::Leave(OsclErrSystemCallFailed);
     37 #if USE_SEM_WAIT
     38     iQueueReadySem.Create();
     39 #endif
     40     iQueueMut.Create();
     41     AddToScheduler();
     42     PendForExec();
     43     iQueueReadySem.Signal();
     44 }
     45 
     46 OSCL_EXPORT_REF ThreadSafeQueue::~ThreadSafeQueue()
     47 {
     48     RemoveFromScheduler();
     49 #if USE_SEM_WAIT
     50     iQueueReadySem.Close();
     51 #endif
     52     iQueueMut.Close();
     53 }
     54 
     55 OSCL_EXPORT_REF bool ThreadSafeQueue::IsInThread()
     56 {
     57     TOsclThreadId id;
     58     if (OsclThread::GetId(id) == OsclProcStatus::SUCCESS_ERROR)
     59     {
     60         return OsclThread::CompareId(id, iThreadId);
     61     }
     62     return false;
     63 }
     64 
     65 OSCL_EXPORT_REF void ThreadSafeQueue::Configure(ThreadSafeQueueObserver* aObs, uint32 aReserve, uint32 aId)
     66 {
     67     iQueueMut.Lock();
     68     iObserver = aObs;
     69     iQueue.reserve(aReserve);
     70     iCounter = aId;
     71     iQueueMut.Unlock();
     72 }
     73 
     74 OSCL_EXPORT_REF ThreadSafeQueueId ThreadSafeQueue::AddToQueue(OsclAny *EventData, ThreadSafeQueueId* aId)
     75 {
     76     iQueueMut.Lock();
     77     uint32 count = (aId) ? *aId : ++iCounter;
     78     ThreadSafeQueueElement elem(count, EventData);
     79     iQueue.push_back(elem);
     80     uint32 size = iQueue.size();
     81     iQueueMut.Unlock();
     82 
     83     //Signal the AO.  Only signal when the queue was previously empty in order
     84     // to minimize the amount of blocking in this call.
     85     if (size == 1)
     86     {
     87 #if USE_SEM_WAIT
     88         //Wait on the AO to be ready to be signaled.
     89         iQueueReadySem.Wait();
     90         PendComplete(OSCL_REQUEST_ERR_NONE);
     91 #else
     92         //To avoid problems under brew applet, don't do a sem wait here.
     93         //instead just check AO status and signal if needed.  It should
     94         //not be possible to lose data, since the only time AO is *not* ready
     95         //to be signaled is when a notification is already pending.
     96         //The reason to not do this in all platforms is that Status() call
     97         //is not thread-safe, but on non-preemptive OS it's ok here.
     98         if (Status() == OSCL_REQUEST_PENDING)
     99             PendComplete(OSCL_REQUEST_ERR_NONE);
    100 #endif
    101     }
    102 
    103     return count;
    104 }
    105 
    106 OSCL_EXPORT_REF uint32 ThreadSafeQueue::DeQueue(ThreadSafeQueueId& aId, OsclAny*& aData)
    107 {
    108     uint32 num = 0;
    109     iQueueMut.Lock();
    110     if (iQueue.size())
    111     {
    112         aId = iQueue[0].iId;
    113         aData = iQueue[0].iData;
    114         iQueue.erase(&iQueue[0]);
    115         num++;
    116     }
    117     iQueueMut.Unlock();
    118     return num;
    119 }
    120 
    121 void ThreadSafeQueue::Run()
    122 {
    123     iQueueMut.Lock();
    124     PendForExec();
    125 #if USE_SEM_WAIT
    126     iQueueReadySem.Signal();
    127 #endif
    128     uint32 count = iQueue.size();
    129     ThreadSafeQueueObserver* obs = iObserver;
    130     iQueueMut.Unlock();
    131 
    132     //note: don't do the callback under the lock, in order to allow
    133     //de-queueing the data in the callback.  this creates the possibility
    134     //that queue size may not equal "count" in the callback.
    135     if (count && obs)
    136         obs->ThreadSafeQueueDataAvailable(this);
    137 }
    138 
    139 
    140 
    141 
    142