Home | History | Annotate | Download | only in sdk_util
      1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sdk_util/thread_pool.h"
      6 
      7 #include <pthread.h>
      8 #include <semaphore.h>
      9 #include <stdio.h>
     10 #include <stdlib.h>
     11 
     12 #include "sdk_util/auto_lock.h"
     13 
     14 namespace sdk_util {
     15 
     16 // Initializes mutex, semaphores and a pool of threads.  If 0 is passed for
     17 // num_threads, all work will be performed on the dispatch thread.
     18 ThreadPool::ThreadPool(int num_threads)
     19     : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
     20       user_data_(NULL), user_work_function_(NULL) {
     21   if (num_threads_ > 0) {
     22     int status;
     23     status = sem_init(&work_sem_, 0, 0);
     24     if (-1 == status) {
     25       fprintf(stderr, "Failed to initialize semaphore!\n");
     26       exit(-1);
     27     }
     28     status = sem_init(&done_sem_, 0, 0);
     29     if (-1 == status) {
     30       fprintf(stderr, "Failed to initialize semaphore!\n");
     31       exit(-1);
     32     }
     33     threads_ = new pthread_t[num_threads_];
     34     for (int i = 0; i < num_threads_; i++) {
     35       status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
     36       if (0 != status) {
     37         fprintf(stderr, "Failed to create thread!\n");
     38         exit(-1);
     39       }
     40     }
     41   }
     42 }
     43 
     44 // Post exit request, wait for all threads to join, and cleanup.
     45 ThreadPool::~ThreadPool() {
     46   if (num_threads_ > 0) {
     47     PostExitAndJoinAll();
     48     delete[] threads_;
     49     sem_destroy(&done_sem_);
     50     sem_destroy(&work_sem_);
     51   }
     52 }
     53 
     54 // Setup work parameters.  This function is called from the dispatch thread,
     55 // when all worker threads are sleeping.
     56 void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
     57   counter_ = counter;
     58   user_work_function_ = work;
     59   user_data_ = data;
     60 }
     61 
     62 // Return decremented task counter.  This function
     63 // can be called from multiple threads at any given time.
     64 int ThreadPool::DecCounter() {
     65   return AtomicAddFetch(&counter_, -1);
     66 }
     67 
     68 // Set exit flag, post and join all the threads in the pool.  This function is
     69 // called only from the dispatch thread, and only when all worker threads are
     70 // sleeping.
     71 void ThreadPool::PostExitAndJoinAll() {
     72   exiting_ = true;
     73   // Wake up all the sleeping worker threads.
     74   for (int i = 0; i < num_threads_; ++i)
     75     sem_post(&work_sem_);
     76   void* retval;
     77   for (int i = 0; i < num_threads_; ++i)
     78     pthread_join(threads_[i], &retval);
     79 }
     80 
     81 // Main work loop - one for each worker thread.
     82 void ThreadPool::WorkLoop() {
     83   while (true) {
     84     // Wait for work. If no work is availble, this thread will sleep here.
     85     sem_wait(&work_sem_);
     86     if (exiting_) break;
     87     while (true) {
     88       // Grab a task index to work on from the counter.
     89       int task_index = DecCounter();
     90       if (task_index < 0)
     91         break;
     92       user_work_function_(task_index, user_data_);
     93     }
     94     // Post to dispatch thread work is done.
     95     sem_post(&done_sem_);
     96   }
     97 }
     98 
     99 // pthread entry point for a worker thread.
    100 void* ThreadPool::WorkerThreadEntry(void* thiz) {
    101   static_cast<ThreadPool*>(thiz)->WorkLoop();
    102   return NULL;
    103 }
    104 
    105 // DispatchMany() will dispatch a set of tasks across worker threads.
    106 // Note: This function will block until all work has completed.
    107 void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
    108   // On entry, all worker threads are sleeping.
    109   Setup(num_tasks, work, data);
    110 
    111   // Wake up the worker threads & have them process tasks.
    112   for (int i = 0; i < num_threads_; i++)
    113     sem_post(&work_sem_);
    114 
    115   // Worker threads are now awake and busy.
    116 
    117   // This dispatch thread will now sleep-wait for the worker threads to finish.
    118   for (int i = 0; i < num_threads_; i++)
    119     sem_wait(&done_sem_);
    120   // On exit, all tasks are done and all worker threads are sleeping again.
    121 }
    122 
    123 //  DispatchHere will dispatch all tasks on this thread.
    124 void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
    125   for (int i = 0; i < num_tasks; i++)
    126     work(i, data);
    127 }
    128 
    129 // Dispatch() will invoke the user supplied work function across
    130 // one or more threads for each task.
    131 // Note: This function will block until all work has completed.
    132 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
    133   if (num_threads_ > 0)
    134     DispatchMany(num_tasks, work, data);
    135   else
    136     DispatchHere(num_tasks, work, data);
    137 }
    138 
    139 }  // namespace sdk_util
    140 
    141