Home | History | Annotate | Download | only in src
      1 /*M///////////////////////////////////////////////////////////////////////////////////////
      2 //
      3 //  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
      4 //
      5 //  By downloading, copying, installing or using the software you agree to this license.
      6 //  If you do not agree to this license, do not download, install,
      7 //  copy or use the software.
      8 //
      9 //
     10 //                           License Agreement
     11 //                For Open Source Computer Vision Library
     12 //
     13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
     14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
     15 // Third party copyrights are property of their respective owners.
     16 //
     17 // Redistribution and use in source and binary forms, with or without modification,
     18 // are permitted provided that the following conditions are met:
     19 //
     20 //   * Redistribution's of source code must retain the above copyright notice,
     21 //     this list of conditions and the following disclaimer.
     22 //
     23 //   * Redistribution's in binary form must reproduce the above copyright notice,
     24 //     this list of conditions and the following disclaimer in the documentation
     25 //     and/or other materials provided with the distribution.
     26 //
     27 //   * The name of the copyright holders may not be used to endorse or promote products
     28 //     derived from this software without specific prior written permission.
     29 //
     30 // This software is provided by the copyright holders and contributors "as is" and
     31 // any express or implied warranties, including, but not limited to, the implied
     32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
     33 // In no event shall the Intel Corporation or contributors be liable for any direct,
     34 // indirect, incidental, special, exemplary, or consequential damages
     35 // (including, but not limited to, procurement of substitute goods or services;
     36 // loss of use, data, or profits; or business interruption) however caused
     37 // and on any theory of liability, whether in contract, strict liability,
     38 // or tort (including negligence or otherwise) arising in any way out of
     39 // the use of this software, even if advised of the possibility of such damage.
     40 //
     41 //M*/
     42 
     43 #include "precomp.hpp"
     44 
     45 #if defined HAVE_PTHREADS && HAVE_PTHREADS
     46 
     47 #include <algorithm>
     48 #include <pthread.h>
     49 
     50 namespace cv
     51 {
     52 
     53 class ThreadManager;
     54 
     55 enum ForThreadState
     56 {
     57     eFTNotStarted = 0,
     58     eFTStarted = 1,
     59     eFTToStop = 2,
     60     eFTStoped = 3
     61 };
     62 
     63 enum ThreadManagerPoolState
     64 {
     65     eTMNotInited = 0,
     66     eTMFailedToInit = 1,
     67     eTMInited = 2,
     68     eTMSingleThreaded = 3
     69 };
     70 
     71 struct work_load
     72 {
     73     work_load()
     74     {
     75         clear();
     76     }
     77 
     78     work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
     79     {
     80         set(range, body, nstripes);
     81     }
     82 
     83     void set(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
     84     {
     85         m_body = &body;
     86         m_range = &range;
     87         m_nstripes = nstripes;
     88         m_blocks_count = ((m_range->end - m_range->start - 1)/m_nstripes) + 1;
     89     }
     90 
     91     const cv::ParallelLoopBody* m_body;
     92     const cv::Range*            m_range;
     93     int                         m_nstripes;
     94     unsigned int                m_blocks_count;
     95 
     96     void clear()
     97     {
     98         m_body = 0;
     99         m_range = 0;
    100         m_nstripes = 0;
    101         m_blocks_count = 0;
    102     }
    103 };
    104 
    105 class ForThread
    106 {
    107 public:
    108 
    109     ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0)
    110     {
    111     }
    112 
    113     //called from manager thread
    114     bool init(size_t id, ThreadManager* parent);
    115 
    116     //called from manager thread
    117     void run();
    118 
    119     //called from manager thread
    120     void stop();
    121 
    122     ~ForThread();
    123 
    124 private:
    125 
    126     //called from worker thread
    127     static void* thread_loop_wrapper(void* thread_object);
    128 
    129     //called from worker thread
    130     void execute();
    131 
    132     //called from worker thread
    133     void thread_body();
    134 
    135     pthread_t       m_posix_thread;
    136     pthread_mutex_t m_thread_mutex;
    137     pthread_cond_t  m_cond_thread_task;
    138     bool            m_task_start;
    139 
    140     ThreadManager*  m_parent;
    141     ForThreadState  m_state;
    142     size_t          m_id;
    143 };
    144 
    145 class ThreadManager
    146 {
    147 public:
    148     friend class ForThread;
    149 
    150     static ThreadManager& instance()
    151     {
    152         if(!m_instance.ptr)
    153         {
    154             pthread_mutex_lock(&m_manager_access_mutex);
    155 
    156             if(!m_instance.ptr)
    157             {
    158                 m_instance.ptr = new ThreadManager();
    159             }
    160 
    161             pthread_mutex_unlock(&m_manager_access_mutex);
    162         }
    163 
    164         return *m_instance.ptr;
    165     }
    166 
    167 
    168     static void stop()
    169     {
    170         ThreadManager& manager = instance();
    171 
    172         if(manager.m_pool_state == eTMInited)
    173         {
    174             for(size_t i = 0; i < manager.m_num_threads; ++i)
    175             {
    176                 manager.m_threads[i].stop();
    177             }
    178         }
    179 
    180         manager.m_pool_state = eTMNotInited;
    181     }
    182 
    183     void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
    184 
    185     size_t getNumOfThreads();
    186 
    187     void setNumOfThreads(size_t n);
    188 
    189 private:
    190 
    191     struct ptr_holder
    192     {
    193         ThreadManager* ptr;
    194 
    195         ptr_holder(): ptr(NULL) { }
    196 
    197         ~ptr_holder()
    198         {
    199             if(ptr)
    200             {
    201                 delete ptr;
    202             }
    203         }
    204     };
    205 
    206     ThreadManager();
    207 
    208     ~ThreadManager();
    209 
    210     void wait_complete();
    211 
    212     void notify_complete();
    213 
    214     bool initPool();
    215 
    216     size_t defaultNumberOfThreads();
    217 
    218     std::vector<ForThread> m_threads;
    219     size_t m_num_threads;
    220 
    221     pthread_mutex_t m_manager_task_mutex;
    222     pthread_cond_t  m_cond_thread_task_complete;
    223     bool            m_task_complete;
    224 
    225     unsigned int m_task_position;
    226     unsigned int m_num_of_completed_tasks;
    227 
    228     static pthread_mutex_t m_manager_access_mutex;
    229     static ptr_holder m_instance;
    230 
    231     static const char m_env_name[];
    232     static const unsigned int m_default_number_of_threads;
    233 
    234     work_load m_work_load;
    235 
    236     struct work_thread_t
    237     {
    238         work_thread_t(): value(false) { }
    239         bool value;
    240     };
    241 
    242     cv::TLSData<work_thread_t> m_is_work_thread;
    243 
    244     ThreadManagerPoolState m_pool_state;
    245 };
    246 
    247 #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
    248 #define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP PTHREAD_RECURSIVE_MUTEX_INITIALIZER
    249 #endif
    250 
    251 pthread_mutex_t ThreadManager::m_manager_access_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
    252 
    253 ThreadManager::ptr_holder ThreadManager::m_instance;
    254 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM";
    255 
    256 #ifdef ANDROID
    257 // many modern phones/tables have 4-core CPUs. Let's use no more
    258 // than 2 threads by default not to overheat the devices
    259 const unsigned int ThreadManager::m_default_number_of_threads = 2;
    260 #else
    261 const unsigned int ThreadManager::m_default_number_of_threads = 8;
    262 #endif
    263 
    264 ForThread::~ForThread()
    265 {
    266     if(m_state == eFTStarted)
    267     {
    268         stop();
    269 
    270         pthread_mutex_destroy(&m_thread_mutex);
    271 
    272         pthread_cond_destroy(&m_cond_thread_task);
    273     }
    274 }
    275 
    276 bool ForThread::init(size_t id, ThreadManager* parent)
    277 {
    278     m_id = id;
    279 
    280     m_parent = parent;
    281 
    282     int res = 0;
    283 
    284     res |= pthread_mutex_init(&m_thread_mutex, NULL);
    285 
    286     res |= pthread_cond_init(&m_cond_thread_task, NULL);
    287 
    288     if(!res)
    289     {
    290         res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this);
    291     }
    292 
    293 
    294     return res == 0;
    295 }
    296 
    297 void ForThread::stop()
    298 {
    299     if(m_state == eFTStarted)
    300     {
    301         m_state = eFTToStop;
    302 
    303         run();
    304 
    305         pthread_join(m_posix_thread, NULL);
    306     }
    307 
    308     m_state = eFTStoped;
    309 }
    310 
    311 void ForThread::run()
    312 {
    313     pthread_mutex_lock(&m_thread_mutex);
    314 
    315     m_task_start = true;
    316 
    317     pthread_cond_signal(&m_cond_thread_task);
    318 
    319     pthread_mutex_unlock(&m_thread_mutex);
    320 }
    321 
    322 void* ForThread::thread_loop_wrapper(void* thread_object)
    323 {
    324     ((ForThread*)thread_object)->thread_body();
    325     return 0;
    326 }
    327 
    328 void ForThread::execute()
    329 {
    330     unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
    331 
    332     work_load& load = m_parent->m_work_load;
    333 
    334     while(m_current_pos < load.m_blocks_count)
    335     {
    336         int start = load.m_range->start + m_current_pos*load.m_nstripes;
    337         int end = std::min(start + load.m_nstripes, load.m_range->end);
    338 
    339         load.m_body->operator()(cv::Range(start, end));
    340 
    341         m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
    342     }
    343 }
    344 
    345 void ForThread::thread_body()
    346 {
    347     m_parent->m_is_work_thread.get()->value = true;
    348 
    349     pthread_mutex_lock(&m_thread_mutex);
    350 
    351     m_state = eFTStarted;
    352 
    353     while(m_state == eFTStarted)
    354     {
    355         //to handle spurious wakeups
    356         while( !m_task_start && m_state != eFTToStop )
    357             pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex);
    358 
    359         if(m_state == eFTStarted)
    360         {
    361             execute();
    362 
    363             m_task_start = false;
    364 
    365             m_parent->notify_complete();
    366         }
    367     }
    368 
    369     pthread_mutex_unlock(&m_thread_mutex);
    370 }
    371 
    372 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited)
    373 {
    374     int res = 0;
    375 
    376     res |= pthread_mutex_init(&m_manager_task_mutex, NULL);
    377 
    378     res |= pthread_cond_init(&m_cond_thread_task_complete, NULL);
    379 
    380     if(!res)
    381     {
    382         setNumOfThreads(defaultNumberOfThreads());
    383 
    384         m_task_position = 0;
    385     }
    386     else
    387     {
    388         m_num_threads = 1;
    389         m_pool_state = eTMFailedToInit;
    390         m_task_position = 0;
    391 
    392         //print error;
    393     }
    394 }
    395 
    396 ThreadManager::~ThreadManager()
    397 {
    398     stop();
    399 
    400     pthread_mutex_destroy(&m_manager_task_mutex);
    401 
    402     pthread_cond_destroy(&m_cond_thread_task_complete);
    403 
    404     pthread_mutex_destroy(&m_manager_access_mutex);
    405 }
    406 
    407 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
    408 {
    409     bool is_work_thread = m_is_work_thread.get()->value;
    410 
    411     if( (getNumOfThreads() > 1) && !is_work_thread &&
    412         (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) )
    413     {
    414         int res = pthread_mutex_trylock(&m_manager_access_mutex);
    415 
    416         if(!res)
    417         {
    418             if(initPool())
    419             {
    420                 double min_stripes = double(range.end - range.start)/(4*m_threads.size());
    421 
    422                 nstripes = std::max(nstripes, min_stripes);
    423 
    424                 pthread_mutex_lock(&m_manager_task_mutex);
    425 
    426                 m_num_of_completed_tasks = 0;
    427 
    428                 m_task_position = 0;
    429 
    430                 m_task_complete = false;
    431 
    432                 m_work_load.set(range, body, std::ceil(nstripes));
    433 
    434                 for(size_t i = 0; i < m_threads.size(); ++i)
    435                 {
    436                     m_threads[i].run();
    437                 }
    438 
    439                 wait_complete();
    440             }
    441             else
    442             {
    443                 //print error
    444                 body(range);
    445             }
    446         }
    447         else
    448         {
    449             body(range);
    450         }
    451     }
    452     else
    453     {
    454         body(range);
    455     }
    456 }
    457 
    458 void ThreadManager::wait_complete()
    459 {
    460     //to handle spurious wakeups
    461     while(!m_task_complete)
    462         pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex);
    463 
    464     pthread_mutex_unlock(&m_manager_task_mutex);
    465 
    466     pthread_mutex_unlock(&m_manager_access_mutex);
    467 }
    468 
    469 void ThreadManager::notify_complete()
    470 {
    471 
    472     unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1);
    473 
    474     if(comp == (m_num_threads - 1))
    475     {
    476         pthread_mutex_lock(&m_manager_task_mutex);
    477 
    478         m_task_complete = true;
    479 
    480         pthread_cond_signal(&m_cond_thread_task_complete);
    481 
    482         pthread_mutex_unlock(&m_manager_task_mutex);
    483     }
    484 }
    485 
    486 bool ThreadManager::initPool()
    487 {
    488     if(m_pool_state != eTMNotInited || m_num_threads == 1)
    489         return true;
    490 
    491     m_threads.resize(m_num_threads);
    492 
    493     bool res = true;
    494 
    495     for(size_t i = 0; i < m_threads.size(); ++i)
    496     {
    497         res |= m_threads[i].init(i, this);
    498     }
    499 
    500     if(res)
    501     {
    502         m_pool_state = eTMInited;
    503     }
    504     else
    505     {
    506         //TODO: join threads?
    507         m_pool_state = eTMFailedToInit;
    508     }
    509 
    510     return res;
    511 }
    512 
    513 size_t ThreadManager::getNumOfThreads()
    514 {
    515     return m_num_threads;
    516 }
    517 
    518 void ThreadManager::setNumOfThreads(size_t n)
    519 {
    520     int res = pthread_mutex_lock(&m_manager_access_mutex);
    521 
    522     if(!res)
    523     {
    524         if(n == 0)
    525         {
    526             n = defaultNumberOfThreads();
    527         }
    528 
    529         if(n != m_num_threads && m_pool_state != eTMFailedToInit)
    530         {
    531             if(m_pool_state == eTMInited)
    532             {
    533                 stop();
    534                 m_threads.clear();
    535             }
    536 
    537             m_num_threads = n;
    538 
    539             if(m_num_threads == 1)
    540             {
    541                 m_pool_state = eTMSingleThreaded;
    542             }
    543             else
    544             {
    545                 m_pool_state = eTMNotInited;
    546             }
    547         }
    548 
    549         pthread_mutex_unlock(&m_manager_access_mutex);
    550     }
    551 }
    552 
    553 size_t ThreadManager::defaultNumberOfThreads()
    554 {
    555     unsigned int result = m_default_number_of_threads;
    556 
    557     char * env = getenv(m_env_name);
    558 
    559     if(env != NULL)
    560     {
    561         sscanf(env, "%u", &result);
    562 
    563         result = std::max(1u, result);
    564         //do we need upper limit of threads number?
    565     }
    566 
    567     return result;
    568 }
    569 
    570 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
    571 size_t parallel_pthreads_get_threads_num();
    572 void parallel_pthreads_set_threads_num(int num);
    573 
    574 size_t parallel_pthreads_get_threads_num()
    575 {
    576     return ThreadManager::instance().getNumOfThreads();
    577 }
    578 
    579 void parallel_pthreads_set_threads_num(int num)
    580 {
    581     if(num < 0)
    582     {
    583         ThreadManager::instance().setNumOfThreads(0);
    584     }
    585     else
    586     {
    587         ThreadManager::instance().setNumOfThreads(size_t(num));
    588     }
    589 }
    590 
    591 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
    592 {
    593     ThreadManager::instance().run(range, body, nstripes);
    594 }
    595 
    596 }
    597 
    598 #endif
    599