1 /* 2 * Copyright (C) 2010 Apple Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 23 * THE POSSIBILITY OF SUCH DAMAGE. 24 */ 25 26 #include "config.h" 27 #include "WorkQueue.h" 28 29 #include <WebCore/NotImplemented.h> 30 #include <wtf/Threading.h> 31 32 inline WorkQueue::WorkItemWin::WorkItemWin(PassOwnPtr<WorkItem> item, WorkQueue* queue) 33 : m_item(item) 34 , m_queue(queue) 35 { 36 } 37 38 PassRefPtr<WorkQueue::WorkItemWin> WorkQueue::WorkItemWin::create(PassOwnPtr<WorkItem> item, WorkQueue* queue) 39 { 40 return adoptRef(new WorkItemWin(item, queue)); 41 } 42 43 WorkQueue::WorkItemWin::~WorkItemWin() 44 { 45 } 46 47 inline WorkQueue::HandleWorkItem::HandleWorkItem(HANDLE handle, PassOwnPtr<WorkItem> item, WorkQueue* queue) 48 : WorkItemWin(item, queue) 49 , m_handle(handle) 50 , m_waitHandle(0) 51 { 52 ASSERT_ARG(handle, handle); 53 } 54 55 PassRefPtr<WorkQueue::HandleWorkItem> WorkQueue::HandleWorkItem::createByAdoptingHandle(HANDLE handle, PassOwnPtr<WorkItem> item, WorkQueue* queue) 56 { 57 return adoptRef(new HandleWorkItem(handle, item, queue)); 58 } 59 60 WorkQueue::HandleWorkItem::~HandleWorkItem() 61 { 62 ::CloseHandle(m_handle); 63 } 64 65 void WorkQueue::handleCallback(void* context, BOOLEAN timerOrWaitFired) 66 { 67 ASSERT_ARG(context, context); 68 ASSERT_ARG(timerOrWaitFired, !timerOrWaitFired); 69 70 WorkItemWin* item = static_cast<WorkItemWin*>(context); 71 WorkQueue* queue = item->queue(); 72 73 { 74 MutexLocker lock(queue->m_workItemQueueLock); 75 queue->m_workItemQueue.append(item); 76 77 // If no other thread is performing work, we can do it on this thread. 78 if (!queue->tryRegisterAsWorkThread()) { 79 // Some other thread is performing work. Since we hold the queue lock, we can be sure 80 // that the work thread is not exiting due to an empty queue and will process the work 81 // item we just added to it. If we weren't holding the lock we'd have to signal 82 // m_performWorkEvent to make sure the work item got picked up. 83 return; 84 } 85 } 86 87 queue->performWorkOnRegisteredWorkThread(); 88 } 89 90 void WorkQueue::registerHandle(HANDLE handle, PassOwnPtr<WorkItem> item) 91 { 92 RefPtr<HandleWorkItem> handleItem = HandleWorkItem::createByAdoptingHandle(handle, item, this); 93 94 { 95 MutexLocker lock(m_handlesLock); 96 ASSERT_ARG(handle, !m_handles.contains(handle)); 97 m_handles.set(handle, handleItem); 98 } 99 100 HANDLE waitHandle; 101 if (!::RegisterWaitForSingleObject(&waitHandle, handle, handleCallback, handleItem.get(), INFINITE, WT_EXECUTEDEFAULT)) { 102 DWORD error = ::GetLastError(); 103 ASSERT_NOT_REACHED(); 104 } 105 handleItem->setWaitHandle(waitHandle); 106 } 107 108 void WorkQueue::unregisterAndCloseHandle(HANDLE handle) 109 { 110 RefPtr<HandleWorkItem> item; 111 { 112 MutexLocker locker(m_handlesLock); 113 ASSERT_ARG(handle, m_handles.contains(handle)); 114 item = m_handles.take(handle); 115 } 116 117 unregisterWaitAndDestroyItemSoon(item.release()); 118 } 119 120 DWORD WorkQueue::workThreadCallback(void* context) 121 { 122 ASSERT_ARG(context, context); 123 124 WorkQueue* queue = static_cast<WorkQueue*>(context); 125 126 if (!queue->tryRegisterAsWorkThread()) 127 return 0; 128 129 queue->performWorkOnRegisteredWorkThread(); 130 return 0; 131 } 132 133 void WorkQueue::performWorkOnRegisteredWorkThread() 134 { 135 ASSERT(m_isWorkThreadRegistered); 136 137 bool isValid = true; 138 139 m_workItemQueueLock.lock(); 140 141 while (isValid && !m_workItemQueue.isEmpty()) { 142 Vector<RefPtr<WorkItemWin> > workItemQueue; 143 m_workItemQueue.swap(workItemQueue); 144 145 // Allow more work to be scheduled while we're not using the queue directly. 146 m_workItemQueueLock.unlock(); 147 for (size_t i = 0; i < workItemQueue.size(); ++i) { 148 MutexLocker locker(m_isValidMutex); 149 isValid = m_isValid; 150 if (!isValid) 151 break; 152 workItemQueue[i]->item()->execute(); 153 } 154 m_workItemQueueLock.lock(); 155 } 156 157 // One invariant we maintain is that any work scheduled while a work thread is registered will 158 // be handled by that work thread. Unregister as the work thread while the queue lock is still 159 // held so that no work can be scheduled while we're still registered. 160 unregisterAsWorkThread(); 161 162 m_workItemQueueLock.unlock(); 163 } 164 165 void WorkQueue::platformInitialize(const char* name) 166 { 167 m_isWorkThreadRegistered = 0; 168 m_timerQueue = ::CreateTimerQueue(); 169 ASSERT_WITH_MESSAGE(m_timerQueue, "::CreateTimerQueue failed with error %lu", ::GetLastError()); 170 } 171 172 bool WorkQueue::tryRegisterAsWorkThread() 173 { 174 LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 1, 0); 175 ASSERT(!result || result == 1); 176 return !result; 177 } 178 179 void WorkQueue::unregisterAsWorkThread() 180 { 181 LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 0, 1); 182 ASSERT_UNUSED(result, result == 1); 183 } 184 185 void WorkQueue::platformInvalidate() 186 { 187 #if !ASSERT_DISABLED 188 MutexLocker lock(m_handlesLock); 189 ASSERT(m_handles.isEmpty()); 190 #endif 191 192 // FIXME: We need to ensure that any timer-queue timers that fire after this point don't try to 193 // access this WorkQueue <http://webkit.org/b/44690>. 194 ::DeleteTimerQueueEx(m_timerQueue, 0); 195 } 196 197 void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item) 198 { 199 MutexLocker locker(m_workItemQueueLock); 200 201 m_workItemQueue.append(WorkItemWin::create(item, this)); 202 203 // Spawn a work thread to perform the work we just added. As an optimization, we avoid 204 // spawning the thread if a work thread is already registered. This prevents multiple work 205 // threads from being spawned in most cases. (Note that when a work thread has been spawned but 206 // hasn't registered itself yet, m_isWorkThreadRegistered will be false and we'll end up 207 // spawning a second work thread here. But work thread registration process will ensure that 208 // only one thread actually ends up performing work.) 209 if (!m_isWorkThreadRegistered) 210 ::QueueUserWorkItem(workThreadCallback, this, WT_EXECUTEDEFAULT); 211 } 212 213 struct TimerContext : public ThreadSafeRefCounted<TimerContext> { 214 static PassRefPtr<TimerContext> create() { return adoptRef(new TimerContext); } 215 216 WorkQueue* queue; 217 OwnPtr<WorkItem> item; 218 Mutex timerMutex; 219 HANDLE timer; 220 221 private: 222 TimerContext() : queue(0), timer(0) { } 223 }; 224 225 void WorkQueue::timerCallback(void* context, BOOLEAN timerOrWaitFired) 226 { 227 ASSERT_ARG(context, context); 228 ASSERT_UNUSED(timerOrWaitFired, timerOrWaitFired); 229 230 // Balanced by leakRef in scheduleWorkAfterDelay. 231 RefPtr<TimerContext> timerContext = adoptRef(static_cast<TimerContext*>(context)); 232 233 timerContext->queue->scheduleWork(timerContext->item.release()); 234 235 MutexLocker lock(timerContext->timerMutex); 236 ASSERT(timerContext->timer); 237 ASSERT(timerContext->queue->m_timerQueue); 238 if (!::DeleteTimerQueueTimer(timerContext->queue->m_timerQueue, timerContext->timer, 0)) 239 ASSERT_WITH_MESSAGE(false, "::DeleteTimerQueueTimer failed with error %lu", ::GetLastError()); 240 } 241 242 void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay) 243 { 244 ASSERT(m_timerQueue); 245 246 RefPtr<TimerContext> context = TimerContext::create(); 247 context->queue = this; 248 context->item = item; 249 250 { 251 // The timer callback could fire before ::CreateTimerQueueTimer even returns, so we protect 252 // context->timer with a mutex to ensure the timer callback doesn't access it before the 253 // timer handle has been stored in it. 254 MutexLocker lock(context->timerMutex); 255 256 // Since our timer callback is quick, we can execute in the timer thread itself and avoid 257 // an extra thread switch over to a worker thread. 258 if (!::CreateTimerQueueTimer(&context->timer, m_timerQueue, timerCallback, context.get(), delay * 1000, 0, WT_EXECUTEINTIMERTHREAD)) { 259 ASSERT_WITH_MESSAGE(false, "::CreateTimerQueueTimer failed with error %lu", ::GetLastError()); 260 return; 261 } 262 } 263 264 // The timer callback will handle destroying context. 265 context.release().leakRef(); 266 } 267 268 void WorkQueue::unregisterWaitAndDestroyItemSoon(PassRefPtr<HandleWorkItem> item) 269 { 270 // We're going to make a blocking call to ::UnregisterWaitEx before closing the handle. (The 271 // blocking version of ::UnregisterWaitEx is much simpler than the non-blocking version.) If we 272 // do this on the current thread, we'll deadlock if we're currently in a callback function for 273 // the wait we're unregistering. So instead we do it asynchronously on some other worker thread. 274 275 ::QueueUserWorkItem(unregisterWaitAndDestroyItemCallback, item.leakRef(), WT_EXECUTEDEFAULT); 276 } 277 278 DWORD WINAPI WorkQueue::unregisterWaitAndDestroyItemCallback(void* context) 279 { 280 ASSERT_ARG(context, context); 281 RefPtr<HandleWorkItem> item = adoptRef(static_cast<HandleWorkItem*>(context)); 282 283 // Now that we know we're not in a callback function for the wait we're unregistering, we can 284 // make a blocking call to ::UnregisterWaitEx. 285 if (!::UnregisterWaitEx(item->waitHandle(), INVALID_HANDLE_VALUE)) { 286 DWORD error = ::GetLastError(); 287 ASSERT_NOT_REACHED(); 288 } 289 290 return 0; 291 } 292