Home | History | Annotate | Download | only in wtf
      1 /*
      2  * Copyright (C) 2011 University of Szeged
      3  * Copyright (C) 2011 Gabor Loki <loki (at) webkit.org>
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY UNIVERSITY OF SZEGED ``AS IS'' AND ANY
     16  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     18  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL UNIVERSITY OF SZEGED OR
     19  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     20  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     22  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
     23  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     25  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "config.h"
     29 #include "ParallelJobs.h"
     30 
     31 #if ENABLE(THREADING_GENERIC)
     32 
     33 #include "wtf/NumberOfCores.h"
     34 
     35 namespace WTF {
     36 
     37 Vector< RefPtr<ParallelEnvironment::ThreadPrivate> >* ParallelEnvironment::s_threadPool = 0;
     38 
     39 ParallelEnvironment::ParallelEnvironment(ThreadFunction threadFunction, size_t sizeOfParameter, int requestedJobNumber) :
     40     m_threadFunction(threadFunction),
     41     m_sizeOfParameter(sizeOfParameter)
     42 {
     43     ASSERT_ARG(requestedJobNumber, requestedJobNumber >= 1);
     44 
     45     int maxNumberOfCores = numberOfProcessorCores();
     46 
     47     if (!requestedJobNumber || requestedJobNumber > maxNumberOfCores)
     48         requestedJobNumber = static_cast<unsigned>(maxNumberOfCores);
     49 
     50     if (!s_threadPool)
     51         s_threadPool = new Vector< RefPtr<ThreadPrivate> >();
     52 
     53     // The main thread should be also a worker.
     54     int maxNumberOfNewThreads = requestedJobNumber - 1;
     55 
     56     for (int i = 0; i < maxNumberOfCores && m_threads.size() < static_cast<unsigned>(maxNumberOfNewThreads); ++i) {
     57         if (s_threadPool->size() < static_cast<unsigned>(i) + 1U)
     58             s_threadPool->append(ThreadPrivate::create());
     59 
     60         if ((*s_threadPool)[i]->tryLockFor(this))
     61             m_threads.append((*s_threadPool)[i]);
     62     }
     63 
     64     m_numberOfJobs = m_threads.size() + 1;
     65 }
     66 
     67 void ParallelEnvironment::execute(void* parameters)
     68 {
     69     unsigned char* currentParameter = static_cast<unsigned char*>(parameters);
     70     size_t i;
     71     for (i = 0; i < m_threads.size(); ++i) {
     72         m_threads[i]->execute(m_threadFunction, currentParameter);
     73         currentParameter += m_sizeOfParameter;
     74     }
     75 
     76     // The work for the main thread.
     77     (*m_threadFunction)(currentParameter);
     78 
     79     // Wait until all jobs are done.
     80     for (i = 0; i < m_threads.size(); ++i)
     81         m_threads[i]->waitForFinish();
     82 }
     83 
     84 bool ParallelEnvironment::ThreadPrivate::tryLockFor(ParallelEnvironment* parent)
     85 {
     86     bool locked = m_mutex.tryLock();
     87 
     88     if (!locked)
     89         return false;
     90 
     91     if (m_parent) {
     92         m_mutex.unlock();
     93         return false;
     94     }
     95 
     96     if (!m_threadID)
     97         m_threadID = createThread(&ParallelEnvironment::ThreadPrivate::workerThread, this, "Parallel worker");
     98 
     99     if (m_threadID)
    100         m_parent = parent;
    101 
    102     m_mutex.unlock();
    103     return m_threadID;
    104 }
    105 
    106 void ParallelEnvironment::ThreadPrivate::execute(ThreadFunction threadFunction, void* parameters)
    107 {
    108     MutexLocker lock(m_mutex);
    109 
    110     m_threadFunction = threadFunction;
    111     m_parameters = parameters;
    112     m_running = true;
    113     m_threadCondition.signal();
    114 }
    115 
    116 void ParallelEnvironment::ThreadPrivate::waitForFinish()
    117 {
    118     MutexLocker lock(m_mutex);
    119 
    120     while (m_running)
    121         m_threadCondition.wait(m_mutex);
    122 }
    123 
    124 void ParallelEnvironment::ThreadPrivate::workerThread(void* threadData)
    125 {
    126     ThreadPrivate* sharedThread = reinterpret_cast<ThreadPrivate*>(threadData);
    127     MutexLocker lock(sharedThread->m_mutex);
    128 
    129     while (sharedThread->m_threadID) {
    130         if (sharedThread->m_running) {
    131             (*sharedThread->m_threadFunction)(sharedThread->m_parameters);
    132             sharedThread->m_running = false;
    133             sharedThread->m_parent = 0;
    134             sharedThread->m_threadCondition.signal();
    135         }
    136 
    137         sharedThread->m_threadCondition.wait(sharedThread->m_mutex);
    138     }
    139 }
    140 
    141 } // namespace WTF
    142 #endif // ENABLE(THREADING_GENERIC)
    143