1 /*M/////////////////////////////////////////////////////////////////////////////////////// 2 // 3 // IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. 4 // 5 // By downloading, copying, installing or using the software you agree to this license. 6 // If you do not agree to this license, do not download, install, 7 // copy or use the software. 8 // 9 // 10 // License Agreement 11 // For Open Source Computer Vision Library 12 // 13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved. 14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved. 15 // Third party copyrights are property of their respective owners. 16 // 17 // Redistribution and use in source and binary forms, with or without modification, 18 // are permitted provided that the following conditions are met: 19 // 20 // * Redistribution's of source code must retain the above copyright notice, 21 // this list of conditions and the following disclaimer. 22 // 23 // * Redistribution's in binary form must reproduce the above copyright notice, 24 // this list of conditions and the following disclaimer in the documentation 25 // and/or other materials provided with the distribution. 26 // 27 // * The name of the copyright holders may not be used to endorse or promote products 28 // derived from this software without specific prior written permission. 29 // 30 // This software is provided by the copyright holders and contributors "as is" and 31 // any express or implied warranties, including, but not limited to, the implied 32 // warranties of merchantability and fitness for a particular purpose are disclaimed. 33 // In no event shall the Intel Corporation or contributors be liable for any direct, 34 // indirect, incidental, special, exemplary, or consequential damages 35 // (including, but not limited to, procurement of substitute goods or services; 36 // loss of use, data, or profits; or business interruption) however caused 37 // and on any theory of liability, whether in contract, strict liability, 38 // or tort (including negligence or otherwise) arising in any way out of 39 // the use of this software, even if advised of the possibility of such damage. 40 // 41 //M*/ 42 43 #include "precomp.hpp" 44 45 #if defined HAVE_PTHREADS && HAVE_PTHREADS 46 47 #include <algorithm> 48 #include <pthread.h> 49 50 namespace cv 51 { 52 53 class ThreadManager; 54 55 enum ForThreadState 56 { 57 eFTNotStarted = 0, 58 eFTStarted = 1, 59 eFTToStop = 2, 60 eFTStoped = 3 61 }; 62 63 enum ThreadManagerPoolState 64 { 65 eTMNotInited = 0, 66 eTMFailedToInit = 1, 67 eTMInited = 2, 68 eTMSingleThreaded = 3 69 }; 70 71 struct work_load 72 { 73 work_load() 74 { 75 clear(); 76 } 77 78 work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) 79 { 80 set(range, body, nstripes); 81 } 82 83 void set(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) 84 { 85 m_body = &body; 86 m_range = ⦥ 87 m_nstripes = nstripes; 88 m_blocks_count = ((m_range->end - m_range->start - 1)/m_nstripes) + 1; 89 } 90 91 const cv::ParallelLoopBody* m_body; 92 const cv::Range* m_range; 93 int m_nstripes; 94 unsigned int m_blocks_count; 95 96 void clear() 97 { 98 m_body = 0; 99 m_range = 0; 100 m_nstripes = 0; 101 m_blocks_count = 0; 102 } 103 }; 104 105 class ForThread 106 { 107 public: 108 109 ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0) 110 { 111 } 112 113 //called from manager thread 114 bool init(size_t id, ThreadManager* parent); 115 116 //called from manager thread 117 void run(); 118 119 //called from manager thread 120 void stop(); 121 122 ~ForThread(); 123 124 private: 125 126 //called from worker thread 127 static void* thread_loop_wrapper(void* thread_object); 128 129 //called from worker thread 130 void execute(); 131 132 //called from worker thread 133 void thread_body(); 134 135 pthread_t m_posix_thread; 136 pthread_mutex_t m_thread_mutex; 137 pthread_cond_t m_cond_thread_task; 138 bool m_task_start; 139 140 ThreadManager* m_parent; 141 ForThreadState m_state; 142 size_t m_id; 143 }; 144 145 class ThreadManager 146 { 147 public: 148 friend class ForThread; 149 150 static ThreadManager& instance() 151 { 152 if(!m_instance.ptr) 153 { 154 pthread_mutex_lock(&m_manager_access_mutex); 155 156 if(!m_instance.ptr) 157 { 158 m_instance.ptr = new ThreadManager(); 159 } 160 161 pthread_mutex_unlock(&m_manager_access_mutex); 162 } 163 164 return *m_instance.ptr; 165 } 166 167 168 static void stop() 169 { 170 ThreadManager& manager = instance(); 171 172 if(manager.m_pool_state == eTMInited) 173 { 174 for(size_t i = 0; i < manager.m_num_threads; ++i) 175 { 176 manager.m_threads[i].stop(); 177 } 178 } 179 180 manager.m_pool_state = eTMNotInited; 181 } 182 183 void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); 184 185 size_t getNumOfThreads(); 186 187 void setNumOfThreads(size_t n); 188 189 private: 190 191 struct ptr_holder 192 { 193 ThreadManager* ptr; 194 195 ptr_holder(): ptr(NULL) { } 196 197 ~ptr_holder() 198 { 199 if(ptr) 200 { 201 delete ptr; 202 } 203 } 204 }; 205 206 ThreadManager(); 207 208 ~ThreadManager(); 209 210 void wait_complete(); 211 212 void notify_complete(); 213 214 bool initPool(); 215 216 size_t defaultNumberOfThreads(); 217 218 std::vector<ForThread> m_threads; 219 size_t m_num_threads; 220 221 pthread_mutex_t m_manager_task_mutex; 222 pthread_cond_t m_cond_thread_task_complete; 223 bool m_task_complete; 224 225 unsigned int m_task_position; 226 unsigned int m_num_of_completed_tasks; 227 228 static pthread_mutex_t m_manager_access_mutex; 229 static ptr_holder m_instance; 230 231 static const char m_env_name[]; 232 static const unsigned int m_default_number_of_threads; 233 234 work_load m_work_load; 235 236 struct work_thread_t 237 { 238 work_thread_t(): value(false) { } 239 bool value; 240 }; 241 242 cv::TLSData<work_thread_t> m_is_work_thread; 243 244 ThreadManagerPoolState m_pool_state; 245 }; 246 247 #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP 248 #define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP PTHREAD_RECURSIVE_MUTEX_INITIALIZER 249 #endif 250 251 pthread_mutex_t ThreadManager::m_manager_access_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; 252 253 ThreadManager::ptr_holder ThreadManager::m_instance; 254 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM"; 255 256 #ifdef ANDROID 257 // many modern phones/tables have 4-core CPUs. Let's use no more 258 // than 2 threads by default not to overheat the devices 259 const unsigned int ThreadManager::m_default_number_of_threads = 2; 260 #else 261 const unsigned int ThreadManager::m_default_number_of_threads = 8; 262 #endif 263 264 ForThread::~ForThread() 265 { 266 if(m_state == eFTStarted) 267 { 268 stop(); 269 270 pthread_mutex_destroy(&m_thread_mutex); 271 272 pthread_cond_destroy(&m_cond_thread_task); 273 } 274 } 275 276 bool ForThread::init(size_t id, ThreadManager* parent) 277 { 278 m_id = id; 279 280 m_parent = parent; 281 282 int res = 0; 283 284 res |= pthread_mutex_init(&m_thread_mutex, NULL); 285 286 res |= pthread_cond_init(&m_cond_thread_task, NULL); 287 288 if(!res) 289 { 290 res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this); 291 } 292 293 294 return res == 0; 295 } 296 297 void ForThread::stop() 298 { 299 if(m_state == eFTStarted) 300 { 301 m_state = eFTToStop; 302 303 run(); 304 305 pthread_join(m_posix_thread, NULL); 306 } 307 308 m_state = eFTStoped; 309 } 310 311 void ForThread::run() 312 { 313 pthread_mutex_lock(&m_thread_mutex); 314 315 m_task_start = true; 316 317 pthread_cond_signal(&m_cond_thread_task); 318 319 pthread_mutex_unlock(&m_thread_mutex); 320 } 321 322 void* ForThread::thread_loop_wrapper(void* thread_object) 323 { 324 ((ForThread*)thread_object)->thread_body(); 325 return 0; 326 } 327 328 void ForThread::execute() 329 { 330 unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1); 331 332 work_load& load = m_parent->m_work_load; 333 334 while(m_current_pos < load.m_blocks_count) 335 { 336 int start = load.m_range->start + m_current_pos*load.m_nstripes; 337 int end = std::min(start + load.m_nstripes, load.m_range->end); 338 339 load.m_body->operator()(cv::Range(start, end)); 340 341 m_current_pos = CV_XADD(&m_parent->m_task_position, 1); 342 } 343 } 344 345 void ForThread::thread_body() 346 { 347 m_parent->m_is_work_thread.get()->value = true; 348 349 pthread_mutex_lock(&m_thread_mutex); 350 351 m_state = eFTStarted; 352 353 while(m_state == eFTStarted) 354 { 355 //to handle spurious wakeups 356 while( !m_task_start && m_state != eFTToStop ) 357 pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex); 358 359 if(m_state == eFTStarted) 360 { 361 execute(); 362 363 m_task_start = false; 364 365 m_parent->notify_complete(); 366 } 367 } 368 369 pthread_mutex_unlock(&m_thread_mutex); 370 } 371 372 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited) 373 { 374 int res = 0; 375 376 res |= pthread_mutex_init(&m_manager_task_mutex, NULL); 377 378 res |= pthread_cond_init(&m_cond_thread_task_complete, NULL); 379 380 if(!res) 381 { 382 setNumOfThreads(defaultNumberOfThreads()); 383 384 m_task_position = 0; 385 } 386 else 387 { 388 m_num_threads = 1; 389 m_pool_state = eTMFailedToInit; 390 m_task_position = 0; 391 392 //print error; 393 } 394 } 395 396 ThreadManager::~ThreadManager() 397 { 398 stop(); 399 400 pthread_mutex_destroy(&m_manager_task_mutex); 401 402 pthread_cond_destroy(&m_cond_thread_task_complete); 403 404 pthread_mutex_destroy(&m_manager_access_mutex); 405 } 406 407 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) 408 { 409 bool is_work_thread = m_is_work_thread.get()->value; 410 411 if( (getNumOfThreads() > 1) && !is_work_thread && 412 (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) ) 413 { 414 int res = pthread_mutex_trylock(&m_manager_access_mutex); 415 416 if(!res) 417 { 418 if(initPool()) 419 { 420 double min_stripes = double(range.end - range.start)/(4*m_threads.size()); 421 422 nstripes = std::max(nstripes, min_stripes); 423 424 pthread_mutex_lock(&m_manager_task_mutex); 425 426 m_num_of_completed_tasks = 0; 427 428 m_task_position = 0; 429 430 m_task_complete = false; 431 432 m_work_load.set(range, body, std::ceil(nstripes)); 433 434 for(size_t i = 0; i < m_threads.size(); ++i) 435 { 436 m_threads[i].run(); 437 } 438 439 wait_complete(); 440 } 441 else 442 { 443 //print error 444 body(range); 445 } 446 } 447 else 448 { 449 body(range); 450 } 451 } 452 else 453 { 454 body(range); 455 } 456 } 457 458 void ThreadManager::wait_complete() 459 { 460 //to handle spurious wakeups 461 while(!m_task_complete) 462 pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex); 463 464 pthread_mutex_unlock(&m_manager_task_mutex); 465 466 pthread_mutex_unlock(&m_manager_access_mutex); 467 } 468 469 void ThreadManager::notify_complete() 470 { 471 472 unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1); 473 474 if(comp == (m_num_threads - 1)) 475 { 476 pthread_mutex_lock(&m_manager_task_mutex); 477 478 m_task_complete = true; 479 480 pthread_cond_signal(&m_cond_thread_task_complete); 481 482 pthread_mutex_unlock(&m_manager_task_mutex); 483 } 484 } 485 486 bool ThreadManager::initPool() 487 { 488 if(m_pool_state != eTMNotInited || m_num_threads == 1) 489 return true; 490 491 m_threads.resize(m_num_threads); 492 493 bool res = true; 494 495 for(size_t i = 0; i < m_threads.size(); ++i) 496 { 497 res |= m_threads[i].init(i, this); 498 } 499 500 if(res) 501 { 502 m_pool_state = eTMInited; 503 } 504 else 505 { 506 //TODO: join threads? 507 m_pool_state = eTMFailedToInit; 508 } 509 510 return res; 511 } 512 513 size_t ThreadManager::getNumOfThreads() 514 { 515 return m_num_threads; 516 } 517 518 void ThreadManager::setNumOfThreads(size_t n) 519 { 520 int res = pthread_mutex_lock(&m_manager_access_mutex); 521 522 if(!res) 523 { 524 if(n == 0) 525 { 526 n = defaultNumberOfThreads(); 527 } 528 529 if(n != m_num_threads && m_pool_state != eTMFailedToInit) 530 { 531 if(m_pool_state == eTMInited) 532 { 533 stop(); 534 m_threads.clear(); 535 } 536 537 m_num_threads = n; 538 539 if(m_num_threads == 1) 540 { 541 m_pool_state = eTMSingleThreaded; 542 } 543 else 544 { 545 m_pool_state = eTMNotInited; 546 } 547 } 548 549 pthread_mutex_unlock(&m_manager_access_mutex); 550 } 551 } 552 553 size_t ThreadManager::defaultNumberOfThreads() 554 { 555 unsigned int result = m_default_number_of_threads; 556 557 char * env = getenv(m_env_name); 558 559 if(env != NULL) 560 { 561 sscanf(env, "%u", &result); 562 563 result = std::max(1u, result); 564 //do we need upper limit of threads number? 565 } 566 567 return result; 568 } 569 570 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); 571 size_t parallel_pthreads_get_threads_num(); 572 void parallel_pthreads_set_threads_num(int num); 573 574 size_t parallel_pthreads_get_threads_num() 575 { 576 return ThreadManager::instance().getNumOfThreads(); 577 } 578 579 void parallel_pthreads_set_threads_num(int num) 580 { 581 if(num < 0) 582 { 583 ThreadManager::instance().setNumOfThreads(0); 584 } 585 else 586 { 587 ThreadManager::instance().setNumOfThreads(size_t(num)); 588 } 589 } 590 591 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) 592 { 593 ThreadManager::instance().run(range, body, nstripes); 594 } 595 596 } 597 598 #endif 599