1 /* ------------------------------------------------------------------ 2 * Copyright (C) 1998-2009 PacketVideo 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 13 * express or implied. 14 * See the License for the specific language governing permissions 15 * and limitations under the License. 16 * ------------------------------------------------------------------- 17 */ 18 19 #include "threadsafe_queue.h" 20 21 //Only do sem wait on pre-emptive thread OS. 22 //The reason is that sem wait is not always available on 23 //non-pre-emptive OS (such as under Brew applet thread) 24 #if !(OSCL_HAS_NON_PREEMPTIVE_THREAD_SUPPORT) 25 #define USE_SEM_WAIT 1 26 #else 27 #define USE_SEM_WAIT 0 28 #endif 29 30 OSCL_EXPORT_REF ThreadSafeQueue::ThreadSafeQueue() 31 : OsclActiveObject(OsclActiveObject::EPriorityNominal, "ThreadSafeQueue") 32 { 33 iObserver = NULL; 34 iCounter = 1; 35 if (OsclThread::GetId(iThreadId) != OsclProcStatus::SUCCESS_ERROR) 36 OsclError::Leave(OsclErrSystemCallFailed); 37 #if USE_SEM_WAIT 38 iQueueReadySem.Create(); 39 #endif 40 iQueueMut.Create(); 41 AddToScheduler(); 42 PendForExec(); 43 iQueueReadySem.Signal(); 44 } 45 46 OSCL_EXPORT_REF ThreadSafeQueue::~ThreadSafeQueue() 47 { 48 RemoveFromScheduler(); 49 #if USE_SEM_WAIT 50 iQueueReadySem.Close(); 51 #endif 52 iQueueMut.Close(); 53 } 54 55 OSCL_EXPORT_REF bool ThreadSafeQueue::IsInThread() 56 { 57 TOsclThreadId id; 58 if (OsclThread::GetId(id) == OsclProcStatus::SUCCESS_ERROR) 59 { 60 return OsclThread::CompareId(id, iThreadId); 61 } 62 return false; 63 } 64 65 OSCL_EXPORT_REF void ThreadSafeQueue::Configure(ThreadSafeQueueObserver* aObs, uint32 aReserve, uint32 aId) 66 { 67 iQueueMut.Lock(); 68 iObserver = aObs; 69 iQueue.reserve(aReserve); 70 iCounter = aId; 71 iQueueMut.Unlock(); 72 } 73 74 OSCL_EXPORT_REF ThreadSafeQueueId ThreadSafeQueue::AddToQueue(OsclAny *EventData, ThreadSafeQueueId* aId) 75 { 76 iQueueMut.Lock(); 77 uint32 count = (aId) ? *aId : ++iCounter; 78 ThreadSafeQueueElement elem(count, EventData); 79 iQueue.push_back(elem); 80 uint32 size = iQueue.size(); 81 iQueueMut.Unlock(); 82 83 //Signal the AO. Only signal when the queue was previously empty in order 84 // to minimize the amount of blocking in this call. 85 if (size == 1) 86 { 87 #if USE_SEM_WAIT 88 //Wait on the AO to be ready to be signaled. 89 iQueueReadySem.Wait(); 90 PendComplete(OSCL_REQUEST_ERR_NONE); 91 #else 92 //To avoid problems under brew applet, don't do a sem wait here. 93 //instead just check AO status and signal if needed. It should 94 //not be possible to lose data, since the only time AO is *not* ready 95 //to be signaled is when a notification is already pending. 96 //The reason to not do this in all platforms is that Status() call 97 //is not thread-safe, but on non-preemptive OS it's ok here. 98 if (Status() == OSCL_REQUEST_PENDING) 99 PendComplete(OSCL_REQUEST_ERR_NONE); 100 #endif 101 } 102 103 return count; 104 } 105 106 OSCL_EXPORT_REF uint32 ThreadSafeQueue::DeQueue(ThreadSafeQueueId& aId, OsclAny*& aData) 107 { 108 uint32 num = 0; 109 iQueueMut.Lock(); 110 if (iQueue.size()) 111 { 112 aId = iQueue[0].iId; 113 aData = iQueue[0].iData; 114 iQueue.erase(&iQueue[0]); 115 num++; 116 } 117 iQueueMut.Unlock(); 118 return num; 119 } 120 121 void ThreadSafeQueue::Run() 122 { 123 iQueueMut.Lock(); 124 PendForExec(); 125 #if USE_SEM_WAIT 126 iQueueReadySem.Signal(); 127 #endif 128 uint32 count = iQueue.size(); 129 ThreadSafeQueueObserver* obs = iObserver; 130 iQueueMut.Unlock(); 131 132 //note: don't do the callback under the lock, in order to allow 133 //de-queueing the data in the callback. this creates the possibility 134 //that queue size may not equal "count" in the callback. 135 if (count && obs) 136 obs->ThreadSafeQueueDataAvailable(this); 137 } 138 139 140 141 142