Home | History | Annotate | Download | only in IlmThread
      1 ///////////////////////////////////////////////////////////////////////////
      2 //
      3 // Copyright (c) 2005, Industrial Light & Magic, a division of Lucas
      4 // Digital Ltd. LLC
      5 //
      6 // All rights reserved.
      7 //
      8 // Redistribution and use in source and binary forms, with or without
      9 // modification, are permitted provided that the following conditions are
     10 // met:
     11 // *       Redistributions of source code must retain the above copyright
     12 // notice, this list of conditions and the following disclaimer.
     13 // *       Redistributions in binary form must reproduce the above
     14 // copyright notice, this list of conditions and the following disclaimer
     15 // in the documentation and/or other materials provided with the
     16 // distribution.
     17 // *       Neither the name of Industrial Light & Magic nor the names of
     18 // its contributors may be used to endorse or promote products derived
     19 // from this software without specific prior written permission.
     20 //
     21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     32 //
     33 ///////////////////////////////////////////////////////////////////////////
     34 
     35 //-----------------------------------------------------------------------------
     36 //
     37 //	class Task, class ThreadPool, class TaskGroup
     38 //
     39 //-----------------------------------------------------------------------------
     40 
     41 #include "IlmThread.h"
     42 #include "IlmThreadMutex.h"
     43 #include "IlmThreadSemaphore.h"
     44 #include "IlmThreadPool.h"
     45 #include "Iex.h"
     46 #include <list>
     47 
     48 using namespace std;
     49 
     50 namespace IlmThread {
     51 namespace {
     52 
     53 class WorkerThread: public Thread
     54 {
     55   public:
     56 
     57     WorkerThread (ThreadPool::Data* data);
     58 
     59     virtual void	run ();
     60 
     61   private:
     62 
     63     ThreadPool::Data *	_data;
     64 };
     65 
     66 } //namespace
     67 
     68 
     69 struct TaskGroup::Data
     70 {
     71      Data ();
     72     ~Data ();
     73 
     74     void	addTask () ;
     75     void	removeTask ();
     76 
     77     Semaphore	isEmpty;	// used to signal that the taskgroup is empty
     78     int		numPending;	// number of pending tasks to still execute
     79 };
     80 
     81 
     82 struct ThreadPool::Data
     83 {
     84      Data ();
     85     ~Data();
     86 
     87     void	finish ();
     88     bool	stopped () const;
     89     void	stop ();
     90 
     91     Semaphore taskSemaphore;        // threads wait on this for ready tasks
     92     Mutex taskMutex;                // mutual exclusion for the tasks list
     93     list<Task*> tasks;              // the list of tasks to execute
     94     size_t numTasks;                // fast access to list size
     95                                     //   (list::size() can be O(n))
     96 
     97     Semaphore threadSemaphore;      // signaled when a thread starts executing
     98     Mutex threadMutex;              // mutual exclusion for threads list
     99     list<WorkerThread*> threads;    // the list of all threads
    100     size_t numThreads;              // fast access to list size
    101 
    102     bool stopping;                  // flag indicating whether to stop threads
    103     Mutex stopMutex;                // mutual exclusion for stopping flag
    104 };
    105 
    106 
    107 
    108 //
    109 // class WorkerThread
    110 //
    111 
    112 WorkerThread::WorkerThread (ThreadPool::Data* data):
    113     _data (data)
    114 {
    115     start();
    116 }
    117 
    118 
    119 void
    120 WorkerThread::run ()
    121 {
    122     //
    123     // Signal that the thread has started executing
    124     //
    125 
    126     _data->threadSemaphore.post();
    127 
    128     while (true)
    129     {
    130     //
    131         // Wait for a task to become available
    132     //
    133 
    134         _data->taskSemaphore.wait();
    135 
    136         {
    137             Lock taskLock (_data->taskMutex);
    138 
    139         //
    140             // If there is a task pending, pop off the next task in the FIFO
    141         //
    142 
    143             if (_data->numTasks > 0)
    144             {
    145                 Task* task = _data->tasks.front();
    146         TaskGroup* taskGroup = task->group();
    147                 _data->tasks.pop_front();
    148                 _data->numTasks--;
    149 
    150                 taskLock.release();
    151                 task->execute();
    152                 taskLock.acquire();
    153 
    154                 delete task;
    155                 taskGroup->_data->removeTask();
    156             }
    157             else if (_data->stopped())
    158         {
    159                 break;
    160         }
    161         }
    162     }
    163 }
    164 
    165 
    166 //
    167 // struct TaskGroup::Data
    168 //
    169 
    170 TaskGroup::Data::Data (): isEmpty (1), numPending (0)
    171 {
    172     // empty
    173 }
    174 
    175 
    176 TaskGroup::Data::~Data ()
    177 {
    178     //
    179     // A TaskGroup acts like an "inverted" semaphore: if the count
    180     // is above 0 then waiting on the taskgroup will block.  This
    181     // destructor waits until the taskgroup is empty before returning.
    182     //
    183 
    184     isEmpty.wait ();
    185 }
    186 
    187 
    188 void
    189 TaskGroup::Data::addTask ()
    190 {
    191     //
    192     // Any access to the taskgroup is protected by a mutex that is
    193     // held by the threadpool.  Therefore it is safe to access
    194     // numPending before we wait on the semaphore.
    195     //
    196 
    197     if (numPending++ == 0)
    198     isEmpty.wait ();
    199 }
    200 
    201 
    202 void
    203 TaskGroup::Data::removeTask ()
    204 {
    205     if (--numPending == 0)
    206     isEmpty.post ();
    207 }
    208 
    209 
    210 //
    211 // struct ThreadPool::Data
    212 //
    213 
    214 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
    215 {
    216     // empty
    217 }
    218 
    219 
    220 ThreadPool::Data::~Data()
    221 {
    222     Lock lock (threadMutex);
    223     finish ();
    224 }
    225 
    226 
    227 void
    228 ThreadPool::Data::finish ()
    229 {
    230     stop();
    231 
    232     //
    233     // Signal enough times to allow all threads to stop.
    234     //
    235     // Wait until all threads have started their run functions.
    236     // If we do not wait before we destroy the threads then it's
    237     // possible that the threads have not yet called their run
    238     // functions.
    239     // If this happens then the run function will be called off
    240     // of an invalid object and we will crash, most likely with
    241     // an error like: "pure virtual method called"
    242     //
    243 
    244     for (size_t i = 0; i < numThreads; i++)
    245     {
    246     taskSemaphore.post();
    247     threadSemaphore.wait();
    248     }
    249 
    250     //
    251     // Join all the threads
    252     //
    253 
    254     for (list<WorkerThread*>::iterator i = threads.begin();
    255      i != threads.end();
    256      ++i)
    257     {
    258     delete (*i);
    259     }
    260 
    261     Lock lock1 (taskMutex);
    262     Lock lock2 (stopMutex);
    263     threads.clear();
    264     tasks.clear();
    265     numThreads = 0;
    266     numTasks = 0;
    267     stopping = false;
    268 }
    269 
    270 
    271 bool
    272 ThreadPool::Data::stopped () const
    273 {
    274     Lock lock (stopMutex);
    275     return stopping;
    276 }
    277 
    278 
    279 void
    280 ThreadPool::Data::stop ()
    281 {
    282     Lock lock (stopMutex);
    283     stopping = true;
    284 }
    285 
    286 
    287 //
    288 // class Task
    289 //
    290 
    291 Task::Task (TaskGroup* g): _group(g)
    292 {
    293     // empty
    294 }
    295 
    296 
    297 Task::~Task()
    298 {
    299     // empty
    300 }
    301 
    302 
    303 TaskGroup*
    304 Task::group ()
    305 {
    306     return _group;
    307 }
    308 
    309 
    310 TaskGroup::TaskGroup ():
    311     _data (new Data())
    312 {
    313     // empty
    314 }
    315 
    316 
    317 TaskGroup::~TaskGroup ()
    318 {
    319     delete _data;
    320 }
    321 
    322 
    323 //
    324 // class ThreadPool
    325 //
    326 
    327 ThreadPool::ThreadPool (unsigned nthreads):
    328     _data (new Data())
    329 {
    330     setNumThreads (nthreads);
    331 }
    332 
    333 
    334 ThreadPool::~ThreadPool ()
    335 {
    336     delete _data;
    337 }
    338 
    339 
    340 int
    341 ThreadPool::numThreads () const
    342 {
    343     Lock lock (_data->threadMutex);
    344     return _data->numThreads;
    345 }
    346 
    347 
    348 void
    349 ThreadPool::setNumThreads (int count)
    350 {
    351     if (count < 0)
    352         throw Iex::ArgExc ("Attempt to set the number of threads "
    353                "in a thread pool to a negative value.");
    354 
    355     //
    356     // Lock access to thread list and size
    357     //
    358 
    359     Lock lock (_data->threadMutex);
    360 
    361     if ((size_t)count > _data->numThreads)
    362     {
    363     //
    364         // Add more threads
    365     //
    366 
    367         while (_data->numThreads < (size_t)count)
    368         {
    369             _data->threads.push_back (new WorkerThread (_data));
    370             _data->numThreads++;
    371         }
    372     }
    373     else if ((size_t)count < _data->numThreads)
    374     {
    375     //
    376     // Wait until all existing threads are finished processing,
    377     // then delete all threads.
    378     //
    379 
    380         _data->finish ();
    381 
    382     //
    383         // Add in new threads
    384     //
    385 
    386         while (_data->numThreads < (size_t)count)
    387         {
    388             _data->threads.push_back (new WorkerThread (_data));
    389             _data->numThreads++;
    390         }
    391     }
    392 }
    393 
    394 
    395 void
    396 ThreadPool::addTask (Task* task)
    397 {
    398     //
    399     // Lock the threads, needed to access numThreads
    400     //
    401 
    402     Lock lock (_data->threadMutex);
    403 
    404     if (_data->numThreads == 0)
    405     {
    406         task->execute ();
    407         delete task;
    408     }
    409     else
    410     {
    411     //
    412         // Get exclusive access to the tasks queue
    413     //
    414 
    415         {
    416             Lock taskLock (_data->taskMutex);
    417 
    418         //
    419             // Push the new task into the FIFO
    420         //
    421 
    422             _data->tasks.push_back (task);
    423             _data->numTasks++;
    424             task->group()->_data->addTask();
    425         }
    426 
    427     //
    428         // Signal that we have a new task to process
    429     //
    430 
    431         _data->taskSemaphore.post ();
    432     }
    433 }
    434 
    435 
    436 ThreadPool&
    437 ThreadPool::globalThreadPool ()
    438 {
    439     //
    440     // The global thread pool
    441     //
    442 
    443     static ThreadPool gThreadPool (0);
    444 
    445     return gThreadPool;
    446 }
    447 
    448 
    449 void
    450 ThreadPool::addGlobalTask (Task* task)
    451 {
    452     globalThreadPool().addTask (task);
    453 }
    454 
    455 
    456 } // namespace IlmThread
    457