1 /////////////////////////////////////////////////////////////////////////// 2 // 3 // Copyright (c) 2005, Industrial Light & Magic, a division of Lucas 4 // Digital Ltd. LLC 5 // 6 // All rights reserved. 7 // 8 // Redistribution and use in source and binary forms, with or without 9 // modification, are permitted provided that the following conditions are 10 // met: 11 // * Redistributions of source code must retain the above copyright 12 // notice, this list of conditions and the following disclaimer. 13 // * Redistributions in binary form must reproduce the above 14 // copyright notice, this list of conditions and the following disclaimer 15 // in the documentation and/or other materials provided with the 16 // distribution. 17 // * Neither the name of Industrial Light & Magic nor the names of 18 // its contributors may be used to endorse or promote products derived 19 // from this software without specific prior written permission. 20 // 21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 // 33 /////////////////////////////////////////////////////////////////////////// 34 35 //----------------------------------------------------------------------------- 36 // 37 // class Task, class ThreadPool, class TaskGroup 38 // 39 //----------------------------------------------------------------------------- 40 41 #include "IlmThread.h" 42 #include "IlmThreadMutex.h" 43 #include "IlmThreadSemaphore.h" 44 #include "IlmThreadPool.h" 45 #include "Iex.h" 46 #include <list> 47 48 using namespace std; 49 50 namespace IlmThread { 51 namespace { 52 53 class WorkerThread: public Thread 54 { 55 public: 56 57 WorkerThread (ThreadPool::Data* data); 58 59 virtual void run (); 60 61 private: 62 63 ThreadPool::Data * _data; 64 }; 65 66 } //namespace 67 68 69 struct TaskGroup::Data 70 { 71 Data (); 72 ~Data (); 73 74 void addTask () ; 75 void removeTask (); 76 77 Semaphore isEmpty; // used to signal that the taskgroup is empty 78 int numPending; // number of pending tasks to still execute 79 }; 80 81 82 struct ThreadPool::Data 83 { 84 Data (); 85 ~Data(); 86 87 void finish (); 88 bool stopped () const; 89 void stop (); 90 91 Semaphore taskSemaphore; // threads wait on this for ready tasks 92 Mutex taskMutex; // mutual exclusion for the tasks list 93 list<Task*> tasks; // the list of tasks to execute 94 size_t numTasks; // fast access to list size 95 // (list::size() can be O(n)) 96 97 Semaphore threadSemaphore; // signaled when a thread starts executing 98 Mutex threadMutex; // mutual exclusion for threads list 99 list<WorkerThread*> threads; // the list of all threads 100 size_t numThreads; // fast access to list size 101 102 bool stopping; // flag indicating whether to stop threads 103 Mutex stopMutex; // mutual exclusion for stopping flag 104 }; 105 106 107 108 // 109 // class WorkerThread 110 // 111 112 WorkerThread::WorkerThread (ThreadPool::Data* data): 113 _data (data) 114 { 115 start(); 116 } 117 118 119 void 120 WorkerThread::run () 121 { 122 // 123 // Signal that the thread has started executing 124 // 125 126 _data->threadSemaphore.post(); 127 128 while (true) 129 { 130 // 131 // Wait for a task to become available 132 // 133 134 _data->taskSemaphore.wait(); 135 136 { 137 Lock taskLock (_data->taskMutex); 138 139 // 140 // If there is a task pending, pop off the next task in the FIFO 141 // 142 143 if (_data->numTasks > 0) 144 { 145 Task* task = _data->tasks.front(); 146 TaskGroup* taskGroup = task->group(); 147 _data->tasks.pop_front(); 148 _data->numTasks--; 149 150 taskLock.release(); 151 task->execute(); 152 taskLock.acquire(); 153 154 delete task; 155 taskGroup->_data->removeTask(); 156 } 157 else if (_data->stopped()) 158 { 159 break; 160 } 161 } 162 } 163 } 164 165 166 // 167 // struct TaskGroup::Data 168 // 169 170 TaskGroup::Data::Data (): isEmpty (1), numPending (0) 171 { 172 // empty 173 } 174 175 176 TaskGroup::Data::~Data () 177 { 178 // 179 // A TaskGroup acts like an "inverted" semaphore: if the count 180 // is above 0 then waiting on the taskgroup will block. This 181 // destructor waits until the taskgroup is empty before returning. 182 // 183 184 isEmpty.wait (); 185 } 186 187 188 void 189 TaskGroup::Data::addTask () 190 { 191 // 192 // Any access to the taskgroup is protected by a mutex that is 193 // held by the threadpool. Therefore it is safe to access 194 // numPending before we wait on the semaphore. 195 // 196 197 if (numPending++ == 0) 198 isEmpty.wait (); 199 } 200 201 202 void 203 TaskGroup::Data::removeTask () 204 { 205 if (--numPending == 0) 206 isEmpty.post (); 207 } 208 209 210 // 211 // struct ThreadPool::Data 212 // 213 214 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false) 215 { 216 // empty 217 } 218 219 220 ThreadPool::Data::~Data() 221 { 222 Lock lock (threadMutex); 223 finish (); 224 } 225 226 227 void 228 ThreadPool::Data::finish () 229 { 230 stop(); 231 232 // 233 // Signal enough times to allow all threads to stop. 234 // 235 // Wait until all threads have started their run functions. 236 // If we do not wait before we destroy the threads then it's 237 // possible that the threads have not yet called their run 238 // functions. 239 // If this happens then the run function will be called off 240 // of an invalid object and we will crash, most likely with 241 // an error like: "pure virtual method called" 242 // 243 244 for (size_t i = 0; i < numThreads; i++) 245 { 246 taskSemaphore.post(); 247 threadSemaphore.wait(); 248 } 249 250 // 251 // Join all the threads 252 // 253 254 for (list<WorkerThread*>::iterator i = threads.begin(); 255 i != threads.end(); 256 ++i) 257 { 258 delete (*i); 259 } 260 261 Lock lock1 (taskMutex); 262 Lock lock2 (stopMutex); 263 threads.clear(); 264 tasks.clear(); 265 numThreads = 0; 266 numTasks = 0; 267 stopping = false; 268 } 269 270 271 bool 272 ThreadPool::Data::stopped () const 273 { 274 Lock lock (stopMutex); 275 return stopping; 276 } 277 278 279 void 280 ThreadPool::Data::stop () 281 { 282 Lock lock (stopMutex); 283 stopping = true; 284 } 285 286 287 // 288 // class Task 289 // 290 291 Task::Task (TaskGroup* g): _group(g) 292 { 293 // empty 294 } 295 296 297 Task::~Task() 298 { 299 // empty 300 } 301 302 303 TaskGroup* 304 Task::group () 305 { 306 return _group; 307 } 308 309 310 TaskGroup::TaskGroup (): 311 _data (new Data()) 312 { 313 // empty 314 } 315 316 317 TaskGroup::~TaskGroup () 318 { 319 delete _data; 320 } 321 322 323 // 324 // class ThreadPool 325 // 326 327 ThreadPool::ThreadPool (unsigned nthreads): 328 _data (new Data()) 329 { 330 setNumThreads (nthreads); 331 } 332 333 334 ThreadPool::~ThreadPool () 335 { 336 delete _data; 337 } 338 339 340 int 341 ThreadPool::numThreads () const 342 { 343 Lock lock (_data->threadMutex); 344 return _data->numThreads; 345 } 346 347 348 void 349 ThreadPool::setNumThreads (int count) 350 { 351 if (count < 0) 352 throw Iex::ArgExc ("Attempt to set the number of threads " 353 "in a thread pool to a negative value."); 354 355 // 356 // Lock access to thread list and size 357 // 358 359 Lock lock (_data->threadMutex); 360 361 if ((size_t)count > _data->numThreads) 362 { 363 // 364 // Add more threads 365 // 366 367 while (_data->numThreads < (size_t)count) 368 { 369 _data->threads.push_back (new WorkerThread (_data)); 370 _data->numThreads++; 371 } 372 } 373 else if ((size_t)count < _data->numThreads) 374 { 375 // 376 // Wait until all existing threads are finished processing, 377 // then delete all threads. 378 // 379 380 _data->finish (); 381 382 // 383 // Add in new threads 384 // 385 386 while (_data->numThreads < (size_t)count) 387 { 388 _data->threads.push_back (new WorkerThread (_data)); 389 _data->numThreads++; 390 } 391 } 392 } 393 394 395 void 396 ThreadPool::addTask (Task* task) 397 { 398 // 399 // Lock the threads, needed to access numThreads 400 // 401 402 Lock lock (_data->threadMutex); 403 404 if (_data->numThreads == 0) 405 { 406 task->execute (); 407 delete task; 408 } 409 else 410 { 411 // 412 // Get exclusive access to the tasks queue 413 // 414 415 { 416 Lock taskLock (_data->taskMutex); 417 418 // 419 // Push the new task into the FIFO 420 // 421 422 _data->tasks.push_back (task); 423 _data->numTasks++; 424 task->group()->_data->addTask(); 425 } 426 427 // 428 // Signal that we have a new task to process 429 // 430 431 _data->taskSemaphore.post (); 432 } 433 } 434 435 436 ThreadPool& 437 ThreadPool::globalThreadPool () 438 { 439 // 440 // The global thread pool 441 // 442 443 static ThreadPool gThreadPool (0); 444 445 return gThreadPool; 446 } 447 448 449 void 450 ThreadPool::addGlobalTask (Task* task) 451 { 452 globalThreadPool().addTask (task); 453 } 454 455 456 } // namespace IlmThread 457