1 /* 2 * Copyright (C) 2011 University of Szeged 3 * Copyright (C) 2011 Gabor Loki <loki (at) webkit.org> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY UNIVERSITY OF SZEGED ``AS IS'' AND ANY 16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UNIVERSITY OF SZEGED OR 19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 23 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 25 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "config.h" 29 #include "ParallelJobs.h" 30 31 #if ENABLE(THREADING_GENERIC) 32 33 #include "wtf/NumberOfCores.h" 34 35 namespace WTF { 36 37 Vector< RefPtr<ParallelEnvironment::ThreadPrivate> >* ParallelEnvironment::s_threadPool = 0; 38 39 ParallelEnvironment::ParallelEnvironment(ThreadFunction threadFunction, size_t sizeOfParameter, int requestedJobNumber) : 40 m_threadFunction(threadFunction), 41 m_sizeOfParameter(sizeOfParameter) 42 { 43 ASSERT_ARG(requestedJobNumber, requestedJobNumber >= 1); 44 45 int maxNumberOfCores = numberOfProcessorCores(); 46 47 if (!requestedJobNumber || requestedJobNumber > maxNumberOfCores) 48 requestedJobNumber = static_cast<unsigned>(maxNumberOfCores); 49 50 if (!s_threadPool) 51 s_threadPool = new Vector< RefPtr<ThreadPrivate> >(); 52 53 // The main thread should be also a worker. 54 int maxNumberOfNewThreads = requestedJobNumber - 1; 55 56 for (int i = 0; i < maxNumberOfCores && m_threads.size() < static_cast<unsigned>(maxNumberOfNewThreads); ++i) { 57 if (s_threadPool->size() < static_cast<unsigned>(i) + 1U) 58 s_threadPool->append(ThreadPrivate::create()); 59 60 if ((*s_threadPool)[i]->tryLockFor(this)) 61 m_threads.append((*s_threadPool)[i]); 62 } 63 64 m_numberOfJobs = m_threads.size() + 1; 65 } 66 67 void ParallelEnvironment::execute(void* parameters) 68 { 69 unsigned char* currentParameter = static_cast<unsigned char*>(parameters); 70 size_t i; 71 for (i = 0; i < m_threads.size(); ++i) { 72 m_threads[i]->execute(m_threadFunction, currentParameter); 73 currentParameter += m_sizeOfParameter; 74 } 75 76 // The work for the main thread. 77 (*m_threadFunction)(currentParameter); 78 79 // Wait until all jobs are done. 80 for (i = 0; i < m_threads.size(); ++i) 81 m_threads[i]->waitForFinish(); 82 } 83 84 bool ParallelEnvironment::ThreadPrivate::tryLockFor(ParallelEnvironment* parent) 85 { 86 bool locked = m_mutex.tryLock(); 87 88 if (!locked) 89 return false; 90 91 if (m_parent) { 92 m_mutex.unlock(); 93 return false; 94 } 95 96 if (!m_threadID) 97 m_threadID = createThread(&ParallelEnvironment::ThreadPrivate::workerThread, this, "Parallel worker"); 98 99 if (m_threadID) 100 m_parent = parent; 101 102 m_mutex.unlock(); 103 return m_threadID; 104 } 105 106 void ParallelEnvironment::ThreadPrivate::execute(ThreadFunction threadFunction, void* parameters) 107 { 108 MutexLocker lock(m_mutex); 109 110 m_threadFunction = threadFunction; 111 m_parameters = parameters; 112 m_running = true; 113 m_threadCondition.signal(); 114 } 115 116 void ParallelEnvironment::ThreadPrivate::waitForFinish() 117 { 118 MutexLocker lock(m_mutex); 119 120 while (m_running) 121 m_threadCondition.wait(m_mutex); 122 } 123 124 void ParallelEnvironment::ThreadPrivate::workerThread(void* threadData) 125 { 126 ThreadPrivate* sharedThread = reinterpret_cast<ThreadPrivate*>(threadData); 127 MutexLocker lock(sharedThread->m_mutex); 128 129 while (sharedThread->m_threadID) { 130 if (sharedThread->m_running) { 131 (*sharedThread->m_threadFunction)(sharedThread->m_parameters); 132 sharedThread->m_running = false; 133 sharedThread->m_parent = 0; 134 sharedThread->m_threadCondition.signal(); 135 } 136 137 sharedThread->m_threadCondition.wait(sharedThread->m_mutex); 138 } 139 } 140 141 } // namespace WTF 142 #endif // ENABLE(THREADING_GENERIC) 143